All posts by Radhika Ravirala

Crafting serverless streaming ETL jobs with AWS Glue

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/crafting-serverless-streaming-etl-jobs-with-aws-glue/

Organizations across verticals have been building streaming-based extract, transform, and load (ETL) applications to more efficiently extract meaningful insights from their datasets. Although streaming ingest and stream processing frameworks have evolved over the past few years, there is now a surge in demand for building streaming pipelines that are completely serverless. Since 2017, AWS Glue has made it easy to prepare and load your data for analytics using ETL.

In this post, we dive into streaming ETL in AWS Glue, a feature that allows you to build continuous ETL applications on streaming data. Streaming ETL in AWS Glue is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing with exactly-once semantics. This post walks you through an example of building a stream processing pipeline using AWS Glue that includes reading streaming data from Amazon Kinesis Data Streams, schema discovery, running streaming ETL, and writing out the results to a sink.

Serverless streaming ETL architecture

For this post, our use case is relevant to our current situation with the COVID-19 pandemic. Ventilators are in high demand and are increasingly used in different settings: hospitals, nursing homes, and even private residences. Ventilators generate data that must be monitored, and an increase in ventilator usage means there is a tremendous amount of streaming data that needs to be processed promptly, so patients can be attended to as quickly as possible as the need arises. In this post, we build a streaming ETL job on ventilator metrics and enhance the data with details to raise the alert level if the metrics fall outside of the normal range. After you enrich the data, you can use it to visualize on monitors.

In our streaming ETL architecture, a Python script generates sample ventilator metrics and publishes them as a stream into Kinesis Data Streams. We create a streaming ETL job in AWS Glue that consumes continuously generated ventilator metrics in micro-batches, applies transformations, performs aggregations, and delivers the data to a sink, so the results can be visualized or used in downstream processes.

Because businesses often augment their data lakes built on Amazon Simple Storage Service (Amazon S3) with streaming data, our first use case applies transformations on the streaming JSON data ingested via Kinesis Data Streams and loads the results in Parquet format to an Amazon S3 data lake. After ingested to Amazon S3, you can query the data with Amazon Athena and build visual dashboards using Amazon QuickSight.

For the second use case, we ingest the data from Kinesis Data Streams, join it with reference data in Amazon DynamoDB to calculate alert levels, and write the results to an Amazon DynamoDB sink. This approach allows you to build near real-time dashboards with alert notifications.

The following diagram illustrates this architecture.

AWS Glue streaming ETL jobs

With AWS Glue, you can now create ETL pipelines on streaming data using continuously running jobs. You can ingest streaming data from Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK). AWS Glue streaming jobs can perform aggregations on data in micro-batches and deliver the processed data to Amazon S3. You can read from the data stream and write to Amazon S3 using the AWS Glue DynamicFrame API. You can also write to arbitrary sinks using native Apache Spark Structured Streaming APIs.

The following sections walk you through building a streaming ETL job in AWS Glue.

Creating a Kinesis data stream

First, we need a streaming ingestion source to consume continuously generated streaming data. For this post, we create a Kinesis data stream with five shards, which allows us to push 5,000 records per second into the stream.

  1. On the Amazon Kinesis dashboard, choose Data streams.
  2. Choose Create data stream.
  3. For Data stream name, enter ventilatorsstream.
  4. For Number of open shards, choose 5.

If you prefer to use the AWS Command Line Interface (AWS CLI), you can create the stream with the following code:

aws kinesis create-stream \
    --stream-name ventilatorstream \
    --shard-count 5

Generating streaming data

We can synthetically generate ventilator data in JSON format using a simple Python application (see the GitHub repo) or the Kinesis Data Generator (KDG).

Using a Python-based data generator

To generate streaming data using a Python script, you can run the following command from your laptop or Amazon Elastic Compute Cloud (Amazon EC2) instance. Make sure you have installed the faker library on your system and set up the boto3 credentials correctly before you run the script.

python3 generate_data.py --streamname glue_ventilator_stream

Using the Kinesis Data Generator

Alternatively, you can also use the Kinesis Data Generator with the ventilator template available on the GitHub repo. The following screenshot shows the template on the KDG console.

We start pushing the data after we create our AWS Glue streaming job.

Defining the schema

We need to specify a schema for our streaming data, which we can do one of two ways:

  • Retrieve a small batch of the data (before starting your streaming job) from the streaming source, infer the schema in batch mode, and use the extracted schema for your streaming job
  • Use the AWS Glue Data Catalog to manually create a table

For this post, we use the AWS Glue Data Catalog to create a ventilator schema.

  1. On the AWS Glue console, choose Data Catalog.
  2. Choose Tables.
  3. From the Add Table drop-down menu, choose Add table manually.
  4. For the table name, enter ventilators_table.
  5. Create a database with the name ventilatordb.
  6. Choose Kinesis as the type of source.
  7. Enter the name of the stream and https://kinesis.<aws-region>.amazonaws.com.
  8. For the classification, choose JSON.
  9. Define the schema according to the following table.
Column nameData type
ventilatoridint
eventtimestring
serialnumberstring
pressurecontrolint
o2statsint
minutevolumeint
manufacturerstring

 

  1. Choose Finish.

Creating an AWS Glue streaming job to hydrate a data lake on Amazon S3

With the streaming source and schema prepared, we’re now ready to create our AWS Glue streaming jobs. We first create a job to ingest data from the streaming source using AWS Glue DataFrame APIs.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a UTF-8 String with no more than 255 characters.
  4. For IAM role¸ specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the AWS Identity and Access Management (IAM) role has permissions to read from Kinesis Data Stream, write to Amazon S3 and read, write to Amazon DynamoDB. Refer to Managing Access Permissions for AWS Glue Resources for details.
  5. For Type, choose Spark Streaming.
  6. For Glue Version, choose Spark 2.4, Python 3.
  7. For This job runs, select A new script authored by you.

You can have AWS Glue generate the streaming ETL code for you, but for this post, we author one from scratch.

  1. For Script file name, enter GlueStreaming-S3.
  2. For S3 path where script is stored, enter your S3 path.
  3. For Job bookmark, choose Disable.

For this post, we use the checkpointing mechanism of AWS Glue to keep track of the data read instead of a job bookmark.

  1. For Monitoring options, select Job metrics and Continuous logging.
  2. For Log filtering, select Standard filter and Spark UI.
  3. For Amazon S3 prefix for Spark event logs, enter the S3 path for the event logs.
  4. For Job parameters, enter the following key-values:
    1. –output path – The S3 path where the final aggregations are persisted
    2. –aws_region – The Region where you run the job

  5. Skip the connections part and choose Save and edit the script.

Streaming ETL to an Amazon S3 sink

We use the AWS Glue DynamicFrameReader class’s from_catalog method to read the streaming data. We specify the table name that has been associated with the data stream as the source of data (see the section Defining the schema). We add additional_options to indicate the starting position to read from in Kinesis Data Streams. TRIM_HORIZON allows us to start reading from the oldest record in the shard.

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

In the preceding code, sourceData represents a streaming DataFrame. We use the foreachBatch API to invoke a function (processBatch) that processes the data represented by this streaming DataFrame. The processBatch function receives a static DataFrame, which holds streaming data for a window size of 100s (default). It creates a DynamicFrame from the static DataFrame and writes out partitioned data to Amazon S3. See the following code:

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})

To transform the DynamicFrame to fix the data type for eventtime (from string to timestamp) and write the ventilator metrics to Amazon S3 in Parquet format, enter the following code:

def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

Putting it all together

In the Glue ETL code editor, enter the following code, then save and run the job:

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})
job.commit()

Querying with Athena

When the processed streaming data is written in Parquet format to Amazon S3, we can run queries on Athena. Run the AWS Glue crawler on the Amazon S3 location where the streaming data is written out. The following screenshot shows our query results.

For instructions on building visual dashboards with the streaming data in Amazon S3, see Quick Start: Create an Analysis with a Single Visual Using Sample Data. The following dashboards show distribution of metrics, averages, and alerts based on anomalies on an hourly basis, but you can create more advanced dashboards with much granular (minute) intervals.

Streaming ETL to a DynamoDB sink

For the second use case, we transform the streaming data as it arrives without micro-batching and persist the data to a DynamoDB table. Scripts to create DynamoDB tables are available in the GitHub repo. We use Apache Spark’s Structured Streaming API to read ventilator-generated data from the data stream, join it with reference data for normal metrics range in a DynamoDB table, compute the status based on the deviation from normal metric values, and write the processed data to a DynamoDB table. See the following code:

import sys
import datetime
import base64
import decimal
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'checkpoint_location', \
                            'dynamodb_sink_table', \
                            'dynamodb_static_table'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read parameters
checkpoint_location = args['checkpoint_location']
aws_region = args['aws_region']

# DynamoDB config
dynamodb_sink_table = args['dynamodb_sink_table']
dynamodb_static_table = args['dynamodb_static_table']

def write_to_dynamodb(row):
    '''
    Add row to DynamoDB.
    '''
    dynamodb = boto3.resource('dynamodb', region_name=aws_region)
    start = str(row['window'].start)
    end = str(row['window'].end)
    dynamodb.Table(dynamodb_sink_table).put_item(
      Item = { 'ventilatorid': row['ventilatorid'], \
                'status': str(row['status']), \
                'start': start, \
                'end': end, \
                'avg_o2stats': decimal.Decimal(str(row['avg_o2stats'])), \
                'avg_pressurecontrol': decimal.Decimal(str(row['avg_pressurecontrol'])), \
                'avg_minutevolume': decimal.Decimal(str(row['avg_minutevolume']))})

dynamodb_dynamic_frame = glueContext.create_dynamic_frame.from_options( \
    "dynamodb", \
    connection_options={
    "dynamodb.input.tableName": dynamodb_static_table,
    "dynamodb.throughput.read.percent": "1.5"
  }
)

dynamodb_lookup_df = dynamodb_dynamic_frame.toDF().cache()

# Read from Kinesis Data Stream
streaming_data = spark.readStream \
                    .format("kinesis") \
                    .option("streamName","glue_ventilator_stream") \
                    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") \
                    .option("startingPosition", "TRIM_HORIZON") \
                    .load()

# Retrieve Sensor columns and do a simple projection
ventilator_fields = streaming_data \
    .select(from_json(col("data") \
    .cast("string"),glueContext.get_catalog_schema_as_spark_schema("ventilatordb","ventilators_table")) \
    .alias("ventilatordata")) \
    .select("ventilatordata.*") \
    .withColumn("event_time", to_timestamp(col('eventtime'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("ts", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

# Stream static join, ETL to augment with status
ventilator_joined_df = ventilator_fields.join(dynamodb_lookup_df, "ventilatorid") \
    .withColumn('status', when( \
    ((ventilator_fields.o2stats < dynamodb_lookup_df.o2_stats_min) | \
    (ventilator_fields.o2stats > dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol < dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol > dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume < dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume > dynamodb_lookup_df.minute_volume_max)), "RED") \
    .when( \
    ((ventilator_fields.o2stats >= dynamodb_lookup_df.o2_stats_min) |
    (ventilator_fields.o2stats <= dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol >= dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol <= dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume >= dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume <= dynamodb_lookup_df.minute_volume_max)), "GREEN") \
    .otherwise("ORANGE"))

ventilator_joined_df.printSchema()

# Drop the normal metric values
ventilator_transformed_df = ventilator_joined_df \
                            .drop('eventtime', 'o2_stats_min', 'o2_stats_max', \
                            'pressure_control_min', 'pressure_control_max', \
                            'minute_volume_min', 'minute_volume_max')

ventilator_transformed_df.printSchema()

ventilators_df = ventilator_transformed_df \
    .groupBy(window(col('ts'), '10 minute', '5 minute'), \
    ventilator_transformed_df.status, ventilator_transformed_df.ventilatorid) \
    .agg( \
    avg(col('o2stats')).alias('avg_o2stats'), \
    avg(col('pressurecontrol')).alias('avg_pressurecontrol'), \
    avg(col('minutevolume')).alias('avg_minutevolume') \
    )

ventilators_df.printSchema()

# Write to DynamoDB sink
ventilator_query = ventilators_df \
    .writeStream \
    .foreach(write_to_dynamodb) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_location) \
    .start()

ventilator_query.awaitTermination()

job.commit()

After the above code is run, ventilator metric aggregations get persisted in the Amazon DynamoDB table as follows. You can build custom user interface applications with the data in Amazon DynamoDB to create dashboards.

Conclusion

Streaming applications have become a core component of data lake architectures. With AWS Glue streaming, you can create serverless ETL jobs that run continuously, consuming data from streaming services like Kinesis Data Streams and Amazon MSK. You can load the results of streaming processing into an Amazon S3-based data lake, JDBC data stores, or arbitrary sinks using the Structured Streaming API.

For more information about streaming AWS Glue ETL jobs, see the following:

We encourage you to build a serverless streaming application using AWS Glue streaming ETL and share your experience with us. If you have any questions or suggestions, share them in the comments.


About the Author


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Powering Amazon Redshift Analytics with Apache Spark and Amazon Machine Learning

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/powering-amazon-redshift-analytics-with-apache-spark-and-amazon-machine-learning/

Air travel can be stressful due to the many factors that are simply out of airline passengers’ control. As passengers, we want to minimize this stress as much as we can. We can do this by using past data to make predictions about how likely a flight will be delayed based on the time of day or the airline carrier.

In this post, we generate a predictive model for flight delays that can be used to help us pick the flight least likely to add to our travel stress. To accomplish this, we will use Apache Spark running on Amazon EMR for extracting, transforming, and loading (ETL) the data, Amazon Redshift for analysis, and Amazon Machine Learning for creating predictive models. This solution gives a good example of combining multiple AWS services to build a sophisticated analytical application in the AWS Cloud.

Architecture

At a high level, our solution includes the following steps:

Step 1 is to ingest datasets:

Step 2 is to enrich data by using ETL:

  • We will transform the maximum and minimum temperature columns from Celsius to Fahrenheit in the weather table in Hive by using a user-defined function in Spark.
  • We enrich the flight data in Amazon Redshift to compute and include extra features and columns (departure hour, days to the nearest holiday) that will help the Amazon Machine Learning algorithm’s learning process.
  • We then combine both the datasets in the Spark environment by using the spark-redshift package to load data from Amazon Redshift cluster to Spark running on an Amazon EMR cluster. We write the enriched data back to a Amazon Redshift table using the spark-redshift package.

Step 3 is to perform predictive analytics:

  • In this last step, we use Amazon Machine Learning to create and train a ML model using Amazon Redshift as our data source. The trained Amazon ML model is used to generate predictions for the test dataset, which are output to an S3 bucket.

o_powering_redshift_analytics_1

Building the solution

Working with datasets

In this example, we use historical weather data for Chicago’s O’Hare International Airport station from 2013 to 2014.

Let’s begin by launching an EMR cluster with Apache Hive and Apache Spark. We use the following Hive script to create the raw weather table, for which we load the weather data from an S3 bucket in the next step.

CREATE EXTERNAL TABLE IF NOT EXISTS weather_raw (
  station       string,
  station_name  string,
  elevation     string,
  latitude      string,
  longitude     string,
  wdate         string,
  prcp          decimal(5,1),
  snow          int,
  tmax          string,
  tmin          string,
  awnd          string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://<your-bucket>/weather/'
TBLPROPERTIES("skip.header.line.count"="1");

CREATE TABLE weather AS SELECT station, station_name, elevation, latitude, longitude, 
                   cast(concat(
                                substr(wdate,1,4), '-', 
                                substr(wdate,5,2), '-', 
                                substr(wdate,7,2) 
                              ) AS date) AS dt, prcp, snow, tmax, tmin, awnd FROM weather_raw;

set hive.cli.print.header=true;

select * from weather limit 10;

Next, we create a single-node Amazon Redshift cluster and copy the flight data from the S3 bucket. Make sure to associate the Amazon Redshift cluster with an AWS Identity and Access Management (IAM) role, which will allow Amazon Redshift to assume a role when reading data from S3.

aws redshift create-cluster \
  --cluster-identifier demo \
  --db-name demo \
  --node-type dc1.large \
  --cluster-type single-node \
  --iam-roles "arn:aws:iam::YOUR-AWS-ACCOUNT:role/<redshift-iam-role>" \
  --master-username master \
  --master-user-password REDSHIFT-MASTER-PASSWORD \
  --publicly-accessible \
  --port 5439

In this example, we will load flight data for flights originating in Chicago’s O’Hare International Airport from 2013 to 2014. You can use most SQL client tools with Amazon Redshift JDBC or ODBC drivers to connect to an Amazon Redshift cluster. For instructions on installing SQL Workbench/J for this use, see the Amazon Redshift documentation.

First, we create a table with data for all flights originating in Chicago in December 2013.

create table ord_flights
(
ID  			bigint identity(0,1),
YEAR 			smallint,
QUARTER		smallint,
MONTH 			smallint,
DAY_OF_MONTH 		smallint,
DAY_OF_WEEK		smallint,
FL_DATE		date, 
UNIQUE_CARRIER 	varchar(10),
AIRLINE_ID 		int,
CARRIER		varchar(4),
TAIL_NUM 		varchar(8),
FL_NUM 		varchar(4),
ORIGIN_AIRPORT_ID 	smallint,
ORIGIN 		varchar(5),
ORIGIN_CITY_NAME  	varchar(35),
ORIGIN_STATE_ABR 	varchar(2),
ORIGIN_STATE_NM 	varchar(50),
ORIGIN_WAC 		varchar(2),
DEST_AIRPORT_ID	smallint,
DEST 			varchar(5),
DEST_CITY_NAME 	varchar(35),
DEST_STATE_ABR 	varchar(2),
DEST_STATE_NM 	varchar(50),
DEST_WAC 		varchar(2),
CRS_DEP_TIME 		smallint,
DEP_TIME 		varchar(6),
DEP_DELAY 		numeric(22,6),
DEP_DELAY_NEW 	numeric(22,6),
DEP_DEL15 		numeric(22,6),
DEP_DELAY_GROUP	smallint,
DEP_TIME_BLK 		varchar(15),
TAXI_OUT 		numeric(22,6),
TAXI_IN 		numeric(22,6),
CRS_ARR_TIME 		numeric(22,6),
ARR_TIME 		varchar(6),
ARR_DELAY 		numeric(22,6),
ARR_DELAY_NEW 	numeric(22,6),
ARR_DEL15 		numeric(22,6),
ARR_DELAY_GROUP 	smallint,
ARR_TIME_BLK 		varchar(15),
CANCELLED 		numeric(22,6),
DIVERTED 		numeric(22,6),
CRS_ELAPSED_TIME 	numeric(22,6),
ACTUAL_ELAPSED_TIME numeric(22,6),
AIR_TIME 		numeric(22,6), 
FLIGHTS		numeric(22,6),
DISTANCE 		numeric(22,6),
DISTANCE_GROUP 	numeric(22,6),
CARRIER_DELAY 	numeric(22,6),
WEATHER_DELAY 	numeric(22,6),
NAS_DELAY 		numeric(22,6),
SECURITY_DELAY 	numeric(22,6),
LATE_AIRCRAFT_DELAY numeric(22,6),
primary key (id)
);

Next, we ensure that the flight data residing in the S3 bucket is in the same region as the Amazon Redshift cluster. We then run the following copy command.

-- Copy all flight data for Dec 2013 and 2014 from S3 bucket
copy ord_flights 
FROM 's3://<path-to-your-faa-data-bucket>/T_ONTIME.csv' credentials 'aws_iam_role=arn:aws:iam::YOUR-AWS-ACCOUNT:role/<RedshiftIAMRole>' csv IGNOREHEADER 1;

Data enrichment

Businesses have myriad data sources to integrate, normalize, and make consumable by downstream analytic processes. To add to the complications, ingested data is rarely useful in its original form for analysis. Workflows often involve using either open source tools such as Spark or commercially available tools to enrich the ingested data. In this example, we will perform simple transformations on the weather data to convert the air temperature (tmax and tmin) from Celsius to Fahrenheit using a user-defined function (UDF) in the Spark environment.

Spark

// Read weather table from hive
val rawWeatherDF = sqlContext.table("weather")

// Retrieve the header
val header = rawWeatherDF.first()

// Remove the header from the dataframe
val noHeaderWeatherDF = rawWeatherDF.filter(row => row != header)

// UDF to convert the air temperature from celsius to fahrenheit
val toFahrenheit = udf {(c: Double) => c * 9 / 5 + 32}

// Apply the UDF to maximum and minimum air temperature
val weatherDF = noHeaderWeatherDF.withColumn("new_tmin", toFahrenheit(noHeaderWeatherDF("tmin")))
                                     .withColumn("new_tmax", toFahrenheit(noHeaderWeatherDF("tmax")))
                                     .drop("tmax")
                                     .drop("tmin")
                                     .withColumnRenamed("new_tmax","tmax")
                                     .withColumnRenamed("new_tmin","tmin")

Data manipulation also consists of excluding or including data fields and columns that are meaningful to the analysis being performed. Because our analysis tries to predict whether a flight will be delayed or not, we will include only columns that help make that determination.

To that end, we will snip a few columns from the flight table and add ‘departure hour’ and ‘days from the nearest holiday’ columns for data on items that might contribute to flight delay. You can compute the ‘days from the nearest holiday’ by using Amazon Redshift’s user-defined functions (UDFs), which will allow you to write a scalar function in Python 2.7. We also use the Pandas library’s USFederalHolidayCalendar to create a list of federal holidays for the years used in this example (see the code snippet following).

Amazon Redshift UDF

First, create a Python UDF to compute number of days before or after the nearest holiday.

create or replace function f_days_from_holiday (year int, month int, day int)
returns int
stable
as $$
  import datetime
  from datetime import date
  import dateutil
  from dateutil.relativedelta import relativedelta

  fdate = date(year, month, day)

  fmt = '%Y-%m-%d'
  s_date = fdate - dateutil.relativedelta.relativedelta(days=7)
  e_date = fdate + relativedelta(months=1)
  start_date = s_date.strftime(fmt)
  end_date = e_date.strftime(fmt)

  """
  Compute a list of holidays over a period (7 days before, 1 month after) for the flight date
  """
  from pandas.tseries.holiday import USFederalHolidayCalendar
  calendar = USFederalHolidayCalendar()
  holidays = calendar.holidays(start_date, end_date)
  days_from_closest_holiday = [(abs(fdate - hdate)).days for hdate in holidays.date.tolist()]
  return min(days_from_closest_holiday)
$$ language plpythonu;

We are now ready to combine the weather data in Hive table with the flights data in our Amazon Redshift table using the spark-redshift package, to demonstrate the data enrichment process.

Working with the spark-redshift package

Databrick’s spark-redshift package is a library that loads data into Spark SQL DataFrames from Amazon Redshift and also saves DataFrames back into Amazon Redshift tables. The library uses the Spark SQL Data Sources API to integrate with Amazon Redshift. This approach makes it easy to integrate large datasets from a Amazon Redshift database with datasets from other data sources and to interoperate seamlessly with disparate input sources like Hive tables, as in this use case, or columnar Parquet files on HDFS or S3, and many others.

The following is an illustration of spark-redshift’s load and save functionality:

o_powering_redshift_analytics_2

Prerequisites

To work with spark-redshift package, you will need to download the following .jar files onto your EMR cluster running spark. Alternatively, you can clone the git repository and build the .jar files from the sources. For this example, we ran EMR version 5.0 with Spark 2.0. Ensure that you download the right versions of the .jar files based on the version of Spark you use.

spark-redshift jar

wget http://repo1.maven.org/maven2/com/databricks/spark-redshift_2.10/2.0.0/spark-redshift_2.10-2.0.0.jar

spark-avro jar

wget http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.0.0/spark-avro_3.0.0.jar

minimal-json.jar

wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

In addition to the .jar file described preceding, you will also need the Amazon Redshift JDBC driver to connect to the Amazon Redshift cluster. Fortunately, the driver is already included in  EMR version 4.7.0 and above.

Connecting to Amazon Redshift cluster from your Spark environment

Launch spark-shell with the prerequisites downloaded from the previous section.

spark-shell --jars spark-redshift_2.10-2.0.0.jar,/usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar,minimal-json-0.9.4.jar,spark-avro_2.11-3.0.0.jar

Once in spark-shell, we run through the following steps to connect to our Amazon Redshift cluster.

/**
  * Example to demonstrate combining tables in Hive and Amazon Redshift for data enrichment.
  */

import org.apache.spark.sql._
import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc41.Driver
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext



// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;

// Read weather table from hive
val rawWeatherDF = sqlContext.table("weather")

// Retrieve the header
val header = rawWeatherDF.first()

// Remove the header from the dataframe
val noHeaderWeatherDF = rawWeatherDF.filter(row => row != header)

// UDF to convert the air temperature from celsius to fahrenheit
val toFahrenheit = udf {(c: Double) => c * 9 / 5 + 32}

// Apply the UDF to maximum and minimum air temperature
val weatherDF = noHeaderWeatherDF.withColumn("new_tmin", toFahrenheit(noHeaderWeatherDF("tmin")))
                                 .withColumn("new_tmax", toFahrenheit(noHeaderWeatherDF("tmax")))
                                 .drop("tmax")
                                 .drop("tmin")
                                 .withColumnRenamed("new_tmax","tmax")
                                 .withColumnRenamed("new_tmin","tmin")

// Provide the jdbc url for Amazon Redshift
val jdbcURL = "jdbc:redshift://<redshift-cluster-name>:<port/<database-name>?user=<dbuser>&password=<dbpassword>"

// Create and declare an S3 bucket where the temporary files are written
val s3TempDir = "s3://<S3TempBucket>/"

Reading from Amazon Redshift

To read from Amazon Redshift, spark-redshift executes a Amazon Redshift UNLOAD command that copies a Amazon Redshift table or results from a query to a temporary S3 bucket that you provide. Then spark-redshift reads the temporary S3 input files and generates a DataFrame instance that you can manipulate in your application.

In this case, we will issue a SQL query against the ord_flights table in the Amazon Redshift cluster from Spark to read only the columns we are interested in. Notice that during this process, we are computing the days_to_holiday column using the Amazon Redshift UDF that we discussed in the earlier section.

// Query against the ord_flights table in Amazon Redshift
val flightsQuery = """
                    select ORD_DELAY_ID, DAY_OF_MONTH, DAY_OF_WEEK, FL_DATE, f_days_from_holiday(year, month, day_of_month) as DAYS_TO_HOLIDAY, UNIQUE_CARRIER, FL_NUM, substring(DEP_TIME, 1, 2) as DEP_HOUR, cast(DEP_DEL15 as smallint),
                    cast(AIR_TIME as integer), cast(FLIGHTS as smallint), cast(DISTANCE as smallint)
                    from flights where origin='ORD' and cancelled = 0
                   """

// Create a Dataframe to hold the results of the above query
val flightsDF = sqlContext.read.format("com.databricks.spark.redshift")
                                          .option("url", jdbcURL)
                                          .option("tempdir", s3TempDir)
                                          .option("query", flightsQuery)
                                          .option("temporary_aws_access_key_id", awsAccessKey)
                                          .option("temporary_aws_secret_access_key", awsSecretKey)
                                          .option("temporary_aws_session_token", token).load()

It’s join time!

// Join the two dataframes
val joinedDF = flightsDF.join(weatherDF, flightsDF("fl_date") === weatherDF("dt"))

Now that we have unified the data, we can write it back to another table in our Amazon Redshift cluster. After that is done, the table can serve as a datasource for advanced analytics, such as for developing predictive models using the Amazon Machine Learning service.

Writing to Amazon Redshift

To write to Amazon Redshift, the spark-redshift library first creates a table in Amazon Redshift using JDBC. Then it copies the partitioned DataFrame as AVRO partitions to a temporary S3 folder that you specify. Finally, it executes the Amazon Redshift COPY command to copy the S3 contents to the newly created Amazon Redshift table. You can also use the append option with spark-redshift to append data to an existing Amazon Redshift table. In this example, we will write the data to a table named ‘ord_flights’ in Amazon Redshift.

joinedDF.write
  .format("com.databricks.spark.redshift")
  .option("temporary_aws_access_key_id", awsAccessKey)
  .option("temporary_aws_secret_access_key", awsSecretKey)
  .option("temporary_aws_session_token", token)
  .option("url", jdbcURL)
  .option("dbtable", "ord_flights")
  .option("aws_iam_role", "arn:aws:iam::<AWS-account>:role/<redshift-role>")
  .option("tempdir", s3TempDir)
  .mode("error")
  .save()

Let us derive a couple of tables in Amazon Redshift from ord_flights to serve as the training and test input datasets for Amazon ML.

-- Derived table for training data
create table train_ord_flights 
as 
        (select * from ord_flights where year = 2013);

select count(*) from train_ord_flights;

-- Derived table for test data
create table test_ord_flights 
as 
        (select * from ord_flights where year = 2013);

select count(*) from test_ord_flights;

Connecting Amazon Redshift to Amazon ML

With processed data on hand, we can perform advanced analytics, such as forecasting future events or conducting what-if analysis to understand their effect on business outcomes.

We started with a goal of predicting flight delays for future flights. However, because future flight data is not available, we will use the unified flight and weather data for December 2013 to create and train a predictive model using Amazon ML. We’ll then use December 2014 data as our test dataset, on which we apply the predictive model to get the batch predictions. We can then compare the generated batch predictions against the actual delays (because we already know the 2014 delays) and determine the accuracy of the model that we have developed.

The predictive model creation here will use the AWS Management Console, but you can also use the AWS CLI or one of the AWS SDKs to accomplish the same thing.

Create an Amazon ML Datasource

The first step in developing an Amazon ML model is creation of a datasource object. We will create a datasource object in Amazon ML by issuing a query against the Amazon Redshift cluster to use December 2013 data as our training dataset. Amazon ML automatically infers the schema based on the values of the columns in the query. You can optionally specify a schema if you want.

In our case, we select departure delay (dep_del15) as the target attribute to predict. When the datasource object is created, Amazon ML shows the target distribution, missing values, and statistics for each of attributes for the datasource object you just created.

o_powering_redshift_analytics_3

Create an Amazon Machine Learning Model

After creating the datasource object, we are ready to create and train an ML model. Select the datasource you just created in previous section and follow the instructions to create the model.

When the model is successfully created, choose Latest evaluation result to learn how the model scored. Amazon ML provides an industry standard accuracy metric for binary classification models called “Area Under the (Receiver Operating Characteristic) Curve (AUC).”

o_powering_redshift_analytics_4

For insights into prediction accuracy, see performance visualization, where the effect of choosing the cut-off score (threshold) can be understood. The prediction score for each record is a numeric value between 0 and 1. The closer to 1, the more likely the prediction should be set to Yes; the further away, the more likely is No. A prediction falls into one of the four cases: a) True-positive (TP)—correctly classified as Yes; b) True-negative (TN)—correctly classified as No; c) False-positive (FP)—wrongly classified as Yes; d) False-negative (TN)—wrongly classified as No.

In this example, False positive indicates flights that are not delayed are reported as delayed, which may cause passengers to miss their flight. False negative means flights that are delayed are classified as being not delayed, which is just an annoyance. We want to minimize the false positives in this model by simply adjusting the threshold to a higher value.

o_powering_redshift_analytics_5

Generate batch predictions

To generate batch predictions, take the two steps following.

Create a Datasource object for the test dataset

Repeat the datasource object creation in Amazon ML as we did in the earlier section to create a test dataset on which model will be deployed. Make sure to use the December 2014 data for the Amazon Redshift query.

Create batch predictions

From the Amazon Machine Learning console, choose Batch Predictions and follow the instructions to create batch predictions by applying the model we created in the last section on the test dataset. Predictions can be located in the “Output S3 URL” path on the summary page. The predictions can be copied back to a Amazon Redshift table and correlated with other flight data to produce visualizations. Such visualizations provide insights such as days and times when the flights are most delayed, carriers that have had delays during holidays, and so on.

o_powering_redshift_analytics_6

Congratulations! You have successfully generated a flight delay forecast for December 2014!

Conclusion

In this post, we built an end-to-end solution for predicting flight delays by combining multiple AWS services—Apache Spark on Amazon EMR, Amazon Redshift, and Amazon ML. We started out with raw data, combined data in Spark with Amazon Redshift using the spark-redshift package, processed data using constructs such as UDFs, and developed a forecasting model for flight delays using Amazon ML. This ability to seamlessly integrate these services truly empowers users to build sophisticated analytical applications on the AWS cloud.  We encourage you to build your own and share your experience with us!


About the Authors

radhika_90
Radhika Ravirala is a solutions architect with Amazon Web Services.

 

 

wangechi_90

Wangechi Doble is a Solutions Architect with a focus on Big Data & Analytics at AWS. She helps customers architect and build out high performance, scalable and secure cloud-based solutions on AWS. In her spare time, she enjoys spending time with her family, reading and traveling to exotic places.

 

 


Related

Serving Real-Time Machine Learning Predictions on Amazon EMR

o_realtime_1_1_1