All posts by Saunak Chandra

Using the Amazon Redshift Data API to interact from an Amazon SageMaker Jupyter notebook

Post Syndicated from Saunak Chandra original https://aws.amazon.com/blogs/big-data/using-the-amazon-redshift-data-api-to-interact-from-an-amazon-sagemaker-jupyter-notebook/

The Amazon Redshift Data API makes it easy for any application written in Python, Go, Java, Node.JS, PHP, Ruby, and C++ to interact with Amazon Redshift. Traditionally, these applications use JDBC connectors to connect, send a query to run, and retrieve results from the Amazon Redshift cluster. This requires extra steps like managing the cluster credentials and configuring the VPC subnet and security group.

In some use cases, you don’t want to manage connections or pass credentials on the wire. The Data API simplifies these steps so you can focus on data consumption instead of managing resources such as the cluster credentials, VPCs, and security groups.

This post demonstrates how you can connect an Amazon SageMaker Jupyter notebook to the Amazon Redshift cluster and run Data API commands in Python. The in-place analysis is an effective way to pull data directly into a Jupyter notebook object. We provide sample code to demonstrate in-place analysis by fetching Data API results into a Pandas DataFrame for quick analysis. For more information about the Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters.

After exploring the mechanics of the Data API in a Jupyter notebook, we demonstrate how to implement a machine learning (ML) model in Amazon SageMaker, using data stored in the Amazon Redshift cluster. We use sample data to build, train, and test an ML algorithm in Amazon SageMaker. Finally, we deploy the model in an Amazon SageMaker instance and draw inference.

Using the Data API in a Jupyter notebook

Jupyter Notebook is a popular data exploration tool primarily used for ML. To work with ML-based analysis, data scientists pull data from sources like websites, Amazon Simple Storage Service (Amazon S3), and databases using Jupyter notebooks. Many Jupyter Notebook users prefer to use data from Amazon Redshift as their primary source of truth for their organization’s data warehouse and event data stored in Amazon S3 data lake.

When you use Amazon Redshift as a data source in Jupyter Notebook, the aggregated data is visualized first for preliminary analysis, followed by extensive ML model building, training, and deployment. Jupyter Notebook connects and runs SQL queries on Amazon Redshift using a Python-based JDBC driver. Data extraction via JDBC drivers poses the following challenges:

  • Dealing with driver installations, credentials and network security management, connection pooling, and caching the result set
  • Additional administrative overhead to bundle the drivers into the Jupyter notebook before sharing the notebook with others

The Data API simplifies these management steps. Jupyter Notebook is pre-loaded with libraries needed to access the Data API, which you import when you use data from Amazon Redshift.

Prerequisites

To provision the resources for this post, you launch the following AWS CloudFormation stack:

The CloudFormation template is tested in the us-east-2 Region. It launches a 2-node DC2.large Amazon Redshift cluster to work on for this post. It also launches an AWS Secrets Manager secret and an Amazon SageMaker Jupyter notebook instance.

The following screenshot shows the Outputs tab for the stack on the AWS CloudFormation console.

The Secrets Manager secret is updated with cluster details required to work with the Data API. An AWS Lambda function is spun up and run during the launch of the CloudFormation template to update the secret (it receives input from the launched Amazon Redshift cluster). The following code updates the secret:

SecretsUpdateFunction:
    Type: AWS::Lambda::Function
    Properties:
      Role: !GetAtt 'LambdaExecutionRole.Arn'
      FunctionName: !Join ['-', ['update_secret', !Ref 'AWS::StackName']]
      MemorySize: 2048
      Runtime: python3.7
      Timeout: 900
      Handler: index.handler
      Code:
        ZipFile:
          Fn::Sub:
          - |-
           import json
           import boto3
           import os
           import logging
           import cfnresponse

           LOGGER = logging.getLogger()
           LOGGER.setLevel(logging.INFO)

           ROLE_ARN = '${Role}'
           SECRET = '${Secret}'
           CLUSTER_ID = CLUSTER_ENDPOINT.split('.')[0]
           DBNAME = 'nyctaxi'

           def handler(event, context):

               # Get CloudFormation-specific parameters, if they exist
               cfn_stack_id = event.get('StackId')
               cfn_request_type = event.get('RequestType')

               #update Secrets Manager secret with host and port
               sm = boto3.client('secretsmanager')
               sec = json.loads(sm.get_secret_value(SecretId=SECRET)['SecretString'])
               sec['dbClusterIdentifier'] = CLUSTER_ID
               sec['db'] = DBNAME
               newsec = json.dumps(sec)
               response = sm.update_secret(SecretId=SECRET, SecretString=newsec)


               # Send a response to CloudFormation pre-signed URL
               cfnresponse.send(event, context, cfnresponse.SUCCESS, {
                   'Message': 'Secrets upated'
                   },
                   context.log_stream_name)

               return {
                   'statusCode': 200,
                   'body': json.dumps('Secrets updated')
               }

          - {
            Role : !GetAtt RedshiftSagemakerRole.Arn,
            Endpoint: !GetAtt RedshiftCluster.Endpoint.Address,
            Secret: !Ref RedshiftSecret
            }

Working with the Data API in Jupyter Notebook

In this section, we walk through the details of working with the Data API in a Jupyter notebook.

  1. On the Amazon SageMaker console, under Notebook, choose Notebook instances.
  2. Locate the notebook you created with the CloudFormation template.
  3. Choose Open Jupyter.

This opens up an empty Amazon SageMaker notebook page.

  1. Download the file RedshiftDeepAR-DataAPI.ipynb to your local storage.
  2. Choose Upload.
  3. Upload RedshiftDeepAR-DataAPI.ipynb.

Importing Python packages

We first import the necessary boto3 package. A few other packages are also relevant for the analysis, which we import in the first cell. See the following code:

import botocore.session as s
from botocore.exceptions import ClientError
import boto3.session
import json
import boto3
import sagemaker
import operator
from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client

import s3fs
import time
import os
import random
import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox

Custom waiter

The Data API calls an HTTPS endpoint. Because ExecuteStatement Data API calls are asynchronous, we need a custom waiter. See the following code:

# Create custom waiter for the Redshift Data API to wait for finish execution of current SQL statement
waiter_name = 'DataAPIExecution'
JSON
delay=2
max_attempts=3

#Configure the waiter settings
waiter_config = {
  'version': 2,
  'waiters': {
    'DataAPIExecution': {
      'operation': 'DescribeStatement',
      'delay': delay,
      'maxAttempts': max_attempts,
      'acceptors': [
        {
          "matcher": "path",
          "expected": "FINISHED",
          "argument": "Status",
          "state": "success"
        },
        {
          "matcher": "pathAny",
          "expected": ["PICKED","STARTED","SUBMITTED"],
          "argument": "Status",
          "state": "retry"
        },
        {
          "matcher": "pathAny",
          "expected": ["FAILED","ABORTED"],
          "argument": "Status",
          "state": "failure"
        }
      ],
    },
  },
}

Retrieving information from Secrets Manager

We need to retrieve the following information from Secrets Manager for the Data API to use:

  • Cluster identifier
  • Secrets ARN
  • Database name

Retrieve the above information using the following code:

secret_name='redshift-dataapidemo' ## replace the secret name with yours
session = boto3.session.Session()
region = session.region_name

client = session.client(
        service_name='secretsmanager',
        region_name=region
    )

try:
    get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    secret_arn=get_secret_value_response['ARN']

except ClientError as e:
    print("Error retrieving secret. Error: " + e.response['Error']['Message'])
    
else:
    # Depending on whether the secret is a string or binary, one of these fields will be populated.
    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            
secret_json = json.loads(secret)

cluster_id=secret_json['dbClusterIdentifier']
db=secret_json['db']
print("Cluster_id: " + cluster_id + "\nDB: " + db + "\nSecret ARN: " + secret_arn)

We now create the Data API client. For the rest of the notebook, we use the Data API client client_redshift. See the following code:

bc_session = s.get_session()

session = boto3.Session(
        botocore_session=bc_session,
        region_name=region,
    )

# Setup the client
client_redshift = session.client("redshift-data")
print("Data API client successfully loaded")

Listing the schema and tables

To list the schema, enter the following code:

client_redshift.list_schemas(
    Database= db, 
    SecretArn= secret_arn, 
    ClusterIdentifier= cluster_id)["Schemas"]

The following screenshot shows the output.

To list the tables, enter the following code:

client_redshift.list_schemas(
    Database= db, 
    SecretArn= secret_arn, 
    ClusterIdentifier= cluster_id)["Schemas"]

The following screenshot shows the output.

Creating the schema and table

Before you issue any SQL statement to the Data API, we instantiate the custom waiter. See the following code:

waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

query_str = "create schema taxischema;"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
id=res["Id"]

# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)    
except WaiterError as e:
    print (e)
    
desc=client_redshift.describe_statement(Id=id)
print("Status: " + desc["Status"] + ". Excution time: %d miliseconds" %float(desc["Duration"]/pow(10,6)))

query_str = 'create table taxischema.nyc_greentaxi(\
vendorid varchar(10),\
lpep_pickup_datetime timestamp,\
lpep_dropoff_datetime timestamp,\
store_and_fwd_flag char(1),\
ratecodeid int,\
pulocationid int,\
dolocationid int,\
passenger_count int,\
trip_distance decimal(8,2),\
fare_amount decimal(8,2),\
extra decimal(8,2),\
mta_tax decimal(8,2),\
tip_amount decimal(8,2),\
tolls_amount decimal(8,2),\
ehail_fee varchar(100),\
improvement_surcharge decimal(8,2),\
total_amount decimal(8,2),\
payment_type varchar(10),\
trip_type varchar(10),\
congestion_surcharge decimal(8,2)\
)\
sortkey (vendorid);'

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
id=res["Id"]

try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)
    
desc=client_redshift.describe_statement(Id=id)
print("Status: " + desc["Status"] + ". Excution time: %d miliseconds" %float(desc["Duration"]/pow(10,6)))

Loading data into the cluster

After we create the table, we’re ready to load some data into it. The following code loads Green taxi cab data from two different Amazon S3 locations using individual COPY statements that run in parallel:

redshift_iam_role = sagemaker.get_execution_role() 
print("IAM Role: " + redshift_iam_role)
source_s3_region='us-east-1'

# Reset the 'delay' attribute of the waiter to 30 seconds for long running COPY statement.
waiter_config["waiters"]["DataAPIExecution"]["delay"] = 20
waiter_model = WaiterModel(waiter_long_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

query_copystr1 = "COPY taxischema.nyc_greentaxi FROM 's3://nyc-tlc/trip data/green_tripdata_2020' IAM_ROLE '" + redshift_iam_role + "' csv ignoreheader 1 region '" + source_s3_region + "';"

query_copystr2 = "COPY taxischema.nyc_greentaxi FROM 's3://nyc-tlc/trip data/green_tripdata_2019' IAM_ROLE '" + redshift_iam_role + "' csv ignoreheader 1 region '" + source_s3_region + "';"

## Execute 2 COPY statements in paralell
res1 = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_copystr1, ClusterIdentifier= cluster_id)
res2 = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_copystr2, ClusterIdentifier= cluster_id)

print("Redshift COPY started ...")

id1 = res1["Id"]
id2 = res2["Id"]
print("\nID: " + id1)
print("\nID: " + id2)

# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id1)
    print("Done waiting to finish Data API for the 1st COPY statement.")
    custom_waiter.wait(Id=id2)
    print("Done waiting to finish Data API for the 2nd COPY statement.")
except WaiterError as e:
    print (e)

desc=client_redshift.describe_statement(Id=id1)
print("[1st COPY] Status: " + desc["Status"] + ". Excution time: %d miliseconds" %float(desc["Duration"]/pow(10,6)))
desc=client_redshift.describe_statement(Id=id2)
print("[2nd COPY] Status: " + desc["Status"] + ". Excution time: %d miliseconds" %float(desc["Duration"]/pow(10,6)))

Performing in-place analysis

We can run the Data API to fetch the query result into a Pandas DataFrame. This simplifies the in-place analysis of the Amazon Redshift cluster data because we bypass unloading the data first into Amazon S3 and then loading it into a Pandas DataFrame.

The following query lists records loaded in the table nyc_greentaxi by year and month:

query_str = "select to_char(lpep_pickup_datetime, 'YYYY-MM') as Pickup_YearMonth, count(*) as Ride_Count from taxischema.nyc_greentaxi group by 1 order by 1 desc;"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)

id = res["Id"]
# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)

output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
ncols=len(output["ColumnMetadata"])
#print("Number of columns: %d" %ncols)
resultrows=output["Records"]

col_labels=[]
for i in range(ncols): col_labels.append(output["ColumnMetadata"][i]['label'])
                                              
records=[]
for i in range(nrows): records.append(resultrows[i])

df = pd.DataFrame(np.array(resultrows), columns=col_labels)

df[col_labels[0]]=df[col_labels[0]].apply(operator.itemgetter('stringValue'))
df[col_labels[1]]=df[col_labels[1]].apply(operator.itemgetter('longValue'))

df

The following screenshot shows the output.

Now that you’re familiar with the Data API in Jupyter Notebook, let’s proceed with ML model building, training, and deployment in Amazon SageMaker.

ML models with Amazon Redshift

The following diagram shows the ML model building, training, and deployment process. The source of data for ML training and testing is Amazon Redshift.

The workflow includes the following steps:

  1. Launch a Jupyter notebook instance in Amazon SageMaker. You make the Data API call from the notebook instance that runs a query in Amazon Redshift.
  2. The query result is unloaded into an S3 bucket. The output data is formatted as CSV, GZIP, or Parquet.
  3. Read the query result from Amazon S3 into a Pandas DataFrame within the Jupyter notebook. This DataFrame is split between train and test data accordingly.
  4. Build the model using the DataFrame, then train and test the model.
  5. Deploy the model into a dedicated instance in Amazon SageMaker. End-users and other systems can call this instance to directly infer by providing the input data.

Building and training the ML model using data from Amazon Redshift

In this section, we review the steps to build and train an Amazon SageMaker model from data in Amazon Redshift. For this post, we use the Amazon SageMaker built-in forecasting algorithm DeepAR and the DeepAR example code on GitHub.

The source data is in an Amazon Redshift table. We build a forecasting ML model to predict the number of Green taxi rides in New York City.

Before building the model using Amazon SageMaker DeepAR, we need to format the raw table data into a format for the algorithm to use using SQL. The following screenshot shows the original format.

The following screenshot shows the converted format.

We convert the raw table data into the preceding format by running the following SQL query. We run the UNLOAD statement using this SQL to unload the transformed data into Amazon S3.

query_str = "UNLOAD('select
   coalesce(v1.pickup_timestamp_norm, v2.pickup_timestamp_norm) as pickup_timestamp_norm,
   coalesce(v1.vendor_1, 0) as vendor_1,
   coalesce(v2.vendor_2, 0) as vendor_2 
from
   (
      select
         case
            when
               extract(minute 
      from
         lpep_dropoff_datetime) between 0 and 14 
      then
         dateadd(minute, 0, date_trunc(''hour'', lpep_dropoff_datetime)) 
      when
         extract(minute 
      from
         lpep_dropoff_datetime) between 15 and 29 
      then
         dateadd(minute, 15, date_trunc(''hour'', lpep_dropoff_datetime)) 
      when
         extract(minute 
      from
         lpep_dropoff_datetime) between 30 and 44 
      then
         dateadd(minute, 30, date_trunc(''hour'', lpep_dropoff_datetime)) 
      when
         extract(minute 
      from
         lpep_dropoff_datetime) between 45 and 59 
      then
         dateadd(minute, 45, date_trunc(''hour'', lpep_dropoff_datetime)) 
         end
         as pickup_timestamp_norm , count(1) as vendor_1 
      from
         taxischema.nyc_greentaxi 
      where
         vendorid = 1 
      group by
         1
   )
   v1 
   full outer join
      (
         select
            case
               when
                  extract(minute 
         from
            lpep_dropoff_datetime) between 0 and 14 
         then
            dateadd(minute, 0, date_trunc(''hour'', lpep_dropoff_datetime)) 
         when
            extract(minute 
         from
            lpep_dropoff_datetime) between 15 and 29 
         then
            dateadd(minute, 15, date_trunc(''hour'', lpep_dropoff_datetime)) 
         when
            extract(minute 
         from
            lpep_dropoff_datetime) between 30 and 44 
         then
            dateadd(minute, 30, date_trunc(''hour'', lpep_dropoff_datetime)) 
         when
            extract(minute 
         from
            lpep_dropoff_datetime) between 45 and 59 
         then
            dateadd(minute, 45, date_trunc(''hour'', lpep_dropoff_datetime)) 
            end
            as pickup_timestamp_norm , count(1) as vendor_2 
         from
            taxischema.nyc_greentaxi 
         where
            vendorid = 2 
         group by
            1
      )
      v2 
      on v1.pickup_timestamp_norm = v2.pickup_timestamp_norm 
;') to '" + redshift_unload_path + "' iam_role '" + redshift_iam_role + "' format as CSV header ALLOWOVERWRITE GZIP"

After we unload the data into Amazon S3, we load the CSV data into a Pandas DataFrame and visualize the dataset. The following plots show the number of rides aggregated per 15 minutes for each of the vendors.

We now train our model using this time series data to forecast the number of rides.

The attached Jupyter notebook contains three steps:

  1. Split the train and test data. Unlike classification and regression ML tasks where the train and split are done by randomly dividing the entire dataset, in this forecasting algorithm, we split the data based on time:
    1. Start date of training data – 2019-01-01
    2. End date of training data – 2020-07-31
  2. Train the model by setting values to the mandatory hyperparameters.

The training job takes around 15 minutes, and the training progress is displayed on the screen. When the job is complete, you see code like the following:

#metrics {"Metrics": {"model.score.time": {"count": 1, "max": 3212.099075317383, "sum": 3212.099075317383, "min": 3212.099075317383}}, "EndTime": 1597702355.281733, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "AWS/DeepAR"}, "StartTime": 1597702352.069719}

[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, RMSE): 24.8660570151
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, mean_absolute_QuantileLoss): 20713.306262554062
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, mean_wQuantileLoss): 0.18868379682658334
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.1]): 0.13653619964790314
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.2]): 0.18786255278771358
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.3]): 0.21525202142165195
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.4]): 0.2283095901515685
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.5]): 0.2297682531655401
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.6]): 0.22057919827603453
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.7]): 0.20157691985194473
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.8]): 0.16576246442811773
[08/17/2020 22:12:35 INFO 140060425148224] #test_score (algo-1, wQuantileLoss[0.9]): 0.11250697170877594
[08/17/2020 22:12:35 INFO 140060425148224] #quality_metric: host=algo-1, test mean_wQuantileLoss <loss>=0.188683796827
[08/17/2020 22:12:35 INFO 140060425148224] #quality_metric: host=algo-1, test RMSE <loss>=24.8660570151
#metrics {"Metrics": {"totaltime": {"count": 1, "max": 917344.633102417, "sum": 917344.633102417, "min": 917344.633102417}, "setuptime": {"count": 1, "max": 10.606050491333008, "sum": 10.606050491333008, "min": 10.606050491333008}}, "EndTime": 1597702355.338799, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "AWS/DeepAR"}, "StartTime": 1597702355.281794}


2020-08-17 22:12:56 Uploading - Uploading generated training model
2020-08-17 22:12:56 Completed - Training job completed
Training seconds: 991
Billable seconds: 991
CPU times: user 3.48 s, sys: 210 ms, total: 3.69 s
Wall time: 18min 56s

 

  1. Deploy the trained model in an Amazon SageMaker endpoint.

We use the endpoint to make predictions on the fly. In this post, we create an endpoint on an ml.m4.xlarge instance class. For displaying prediction results, we have provide an interactive time series graph. You can adjust four control values:

  • vendor_id – The vendor ID.
  • forecast_day – The offset from the training end date. This is the first date of the forecast prediction.
  • confidence – The confidence interval.
  • history_weeks_plot – The number of weeks in the plot prior to the forecast day.

The prediction plot looks like the following screenshot.

Conclusion

In this post, we walked through steps to interact with Amazon Redshift from an Amazon SageMaker Jupyter notebook using the Data API. We provided sample codes for the notebook to wait for the Data API to finish specific steps. The sample code showed how to configure the wait time for different SQL.

The length of wait time depends on the type of query you submit. A COPY command, which loads a large number of Amazon S3 objects, is usually longer than a SELECT query.

You can retrieve query results directly into a Pandas DataFrame by calling the GetStatementResult API. This approach simplifies the in-place analysis by delegating complex SQL queries at Amazon Redshift and visualizing the data by fetching the query result into the Jupyter notebook.

We further explored building and deploying an ML model on Amazon SageMaker using train and test data from Amazon Redshift.

For more information about the Data API, watch the video Introducing the Amazon Redshift Data API on YouTube and see Using the Amazon Redshift Data API.


About the Authors

Saunak Chandra is a senior partner solutions architect for Redshift at AWS. Saunak likes to experiment with new products in the technology space, alongside his day to day work. He loves exploring the nature in the Pacific Northwest. A short hiking or biking in the trails is his favorite weekend morning routine. He also likes to do yoga when he gets time from his kid.

 

 

Debu Panda, a senior product manager at AWS, is an industry leader in analytics, application platform, and database technologies. He has more than 20 years of experience in the IT industry and has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt).

 

 

Chao Duan is a software development manager at Amazon Redshift, where he leads the development team focusing on enabling self-maintenance and self-tuning with comprehensive monitoring for Redshift. Chao is passionate about building high-availability, high-performance, and cost-effective database to empower customers with data-driven decision making.

How the ZS COVID-19 Intelligence Engine helps Pharma & Med device manufacturers understand local healthcare needs & gaps at scale

Post Syndicated from Saunak Chandra original https://aws.amazon.com/blogs/big-data/how-the-zs-covid-19-intelligence-engine-helps-pharma-med-device-manufacturers-understand-local-healthcare-needs-gaps-at-scale/

This post is co-written by Parijat Sharma: Principal, Strategy & Transformation, Wenhao Xia: Manager, Data Science, Vineeth Sandadi: Manager, Business Consulting from ZS Associates, Inc, Arianna Tousi: Strategy, Insights and Planning Consultant from ZS, Gopi Vikranth: Associate Principal from ZS. In their own words, “We’re passionately committed to helping our clients and their customers thrive, working side by side to drive customer value and results”.

The COVID-19 trajectory across the US continues to remain unstable and heterogeneous. Although certain cities and counties were able to tame the adverse effects of the pandemic by applying stricter controls on social life, newer hotspots are emerging in different locations sporadically.

Organizations in healthcare, pharma, and biotech are looking to adapt to a rapidly evolving and diverse local market landscape, and restart parts of their operations that are significantly impacted, such as patient support functions, sales, and key account management. Real-time insights into the rapidly evolving COVID-19 situation and its impact on all key stakeholders in the healthcare supply chain, including patients, physicians, and health systems, is a key asset in helping companies adapt based on local market dynamics and remain resilient to future disruptions. However, several life-science companies don’t have these insights because they lack the infrastructure to integrate and manage the relevant datasets at scale and the analytical capabilities to mine the data for the relevant insights.

ZS came into this critical situation and built a data lake on AWS to address these challenges. The primary characteristics of this data lake is that it’s largely open source, which gives ZS a head start to meet the product launch SLA using AWS. This post describes how ZS developed the data lake and brought their proprietary machine learning (ML) models to run on AWS, providing intelligent insight on COVID-19.

What is the ZS COVID-19 Intelligence Engine?

The ZS COVID-19 Intelligence Engine was designed as a customizable capability that does the following:

  • Integrates diverse public and proprietary healthcare datasets in a scalable data warehouse that stores data in a secure and compliant manner
  • Provides advanced descriptive and predictive analytical modules to forecast COVID-19 evolution and its impact on key stakeholders and the treatment journey
  • Packages insights into intuitive preconfigured reports and dashboards for dissemination across an organization

AWS Cloud data and analytics infrastructure

In this section, we dive into the infrastructure components of the ZS COVID-19 Intelligence Engine. The objective was to quickly set up a data lake with an accompanying ingestion mechanism to allow rapid ingestion of public datasets, third-party data, and datasets from AWS Data Exchange.

The overall data processing solution is based on ZS’s REVO™ data management product, which uses Apache Spark on Amazon EMR. The Spark engine processes and transforms raw data into structured data that is ready for interactive analysis. The raw data comes in compressed text delimited format ranging from 100 MBs to 15 GB. After the data is cleansed and rules applied, the processed data is staged in Amazon Simple Storage Service (Amazon S3) buckets in Apache Parquet format. This data is selectively loaded into an Amazon Redshift cluster for fast interactive querying and repetitive analysis on subsets of data.

The Intelligence Engine also uses a powerful Amazon Elastic Compute Cloud (Amazon EC2) instance to run ML workloads, which predicts future COVID-19 caseloads at the county level. The prediction models run daily on a compute-optimized EC2 C5.24xlarge On-Demand Instance, allowing rapid turnaround of prediction results and saving overall cost for using On-Demand Instances.

ZS uses Amazon Redshift as the data warehouse in this architecture. Amazon Redshift is easy to launch and maintain and can quickly run analytical queries on large normalized datasets using standard ANSI SQL. After the raw data gets processed using ZS’s REVO™, the curated data is loaded into Amazon Redshift to run interactive analytical queries. The queries generate insights specific to local geography, county, and healthcare systems, and run on Amazon Redshift tables consisting of anonymized patient data. The Amazon Redshift cluster uses On-Demand Instances and is sized to accommodate 25 TB of data at the time of this product launch. Typical interactive queries include joining data across large tables, up to 1.5 billion rows in the main table.

The following diagram illustrates this architecture:

The ZS COVID-19 data lake has several benefits and applicable use cases:

  • Streamlined data procurement processes – Eliminates the need for multiple ZS teams to procure, ingest, and process the same datasets separately
  • Optimized common usage across clients and business questions – ZS uses this capability to publish common derivations of data that can then be utilized across different ZS teams and use cases to create a single version of truth
  • Cross-functional processes and requirements – Some analytics use cases require cross-functional data and are significantly hampered by the ability of a user to access various data sources in one place—a data lake enables this by design
  • Connected healthcare data – Due to developing common standards and integrating with MDM and ontologies, data from the public domain can be compliantly integrated with pharma manufacturer-specific data sources to enable seamless analytics on the data lake

Comprehensive healthcare data lake

At its core, the Intelligence Engine builds a scalable and integrated repository of diverse public and proprietary data sources. These datasets range in variety, volume, and velocity:

  • COVID-19 incidence – There are several COVID-specific datasets that the public has become accustomed to viewing over the past several months, such as Johns Hopkins incidence tracking and IHME predictive data, which describes how the disease has been progressing over time and even into the future. This data tends to be at either the state or county level and is often refreshed daily. The data lake solution contains the entire history for these datasets, which, taken together, spans into the hundreds of gigabytes in size. In addition to these sources, ZS’ proprietary predictive models add an additional element of accuracy and are customized with ZS-specific insights.
  • Government policies – Government policy data, which is mostly being used from AWS Data Exchange on behalf of the New York Times, explains the current state of government mandates and recommendations for varying degrees of lockdown or reopening as it pertains to the pandemic. This data is much smaller in volume, well under 1 GB total.
  • Insurance claims at patient level – Thanks to the partnership with Symphony Health, ZS have had the opportunity to analyze and expose patient claims data that can be attributed to the specific hospital account or healthcare provider for which that claim took place. The insurance claims data is the largest volume of data—close to 15 TB—contributing to the ZS COVID-19 Intelligence Engine. ZS’ data engineering team has wrangled these large datasets with the help of Amazon EMR for aggregating and processing metrics, which are then stored in Amazon Redshift in a transformed version that is much smaller and can be more easily understood than the original raw datasets.
  • HCP to site of care affiliations – Thanks to the partnership with Definitive Healthcare, ZS are in the process of integrating best-in-class physician-hospital and clinic affiliations from Definitive Healthcare with patient claims from Symphony to help assess available healthcare capacity and evolving approaches to care delivery and type of care being delivered by disease area.
  • Other Intelligence engine data sources
    • State testing rates
    • Mobility
    • Demographics and social determinants of health
    • Provider access and affinity for pharma commercial engagement (from ZS affinity/access monitor)
    • Automated data ingestors for a variety of pharma manufacturer-specific data sources including specialty pharmacy and hub transactions, sales force activity, customer digital engagement, and more

Predictive models for COVID-19 projections and healthcare demand-supply gaps at a local level

To drive decision-making at a local level, ZS required more granular projections of COVID-19 disease spread than what’s publicly available at a state or national level. Therefore, as part of the Intelligence Engine, the ZS data science team aimed to developed an ensemble model of COVID-19 projections at the county level to identify emerging local healthcare gaps along different phases of the treatment process.

Developing a locally predictive model has many challenges, and ZS believe that no single model can capture all the virtually infinite drivers and factors contributing to disease spread within a specific geographic area. Therefore, the ZS data science team behind the COVID-19 projections has implemented multiple projection models, each with their own set of input data sources, assumptions, and parameters. This allows to increase the accuracy of the projection while retaining a level of stability and interpretability of tge model. These models include:

  • Statistical curve fitting model – A disease progression curve using a Generalized Gaussian Cumulative Distribution Function, optimized to minimize prediction error of COVID-19 cases and deaths
  • SEIR model – Traditional epidemiological disease progression model (pathway of Susceptible – Exposed – Infectious – Recovered) combined with traditional ML on model parameters
  • Agent-based simulation – County-level simulation of individual interactions between people within the county

Obtaining a more granular view of future virus spread at a local level is critical in order to provide support for challenges in specific sites of care. Accurately projecting cases at the county level can be difficult for many reasons. Counties with low current case counts means that the model has little historical data to learn from (both in time since first infection and in magnitude of cases). Additionally, forecasts can be sensitive to many variables, and the current second wave of COVID-19 infections adds additional complications to tracking the spread of the virus.

To combat some of these difficulties, ZS implemented a two-phased approach to generate county-level projections. Counties with a long enough history of virus spread are projected independently using the three disease progression models we outlined, whereas counties with limited history are projected using a combination of state-level projections and social determinants of health factors that are predictive of disease spread (for example, age distribution in a certain county).

As the world around us continues to evolve and the COVID-19 situation with it, the ZS data science team is also working to adapt the model alongside the current situation. Currently, model adaptability and its self-learning ability are continuing to improve to better adapt to the onset of the second wave of the virus. Additional parameters and re-optimizations are happening daily as the situation develops.

Following image shows the Input data sources, modeling techniques and outputs from ZS COVID-19 projection models:

Analyzing and predicting local non-COVID-19 treatment gaps and their drivers

Several flexible analytical tools can be used to evaluate barriers along the disease treatment journey for non-COVID-19 diseases at the local geography level, their evolution over time with COVID-19, and their underlying drivers. These tools summarize local changes in and the underlying drivers of the following:

  • New patient diagnosis
  • Changes in treatment approaches and drugs used
  • Patient affordability and access to medications
  • Persistency and compliance to treatment
  • Healthcare demand, patients needing care and supply, provider capacity to offer care

Following image represents output from the Intelligence Engine illustrating local variations in Healthcare gaps:

Intuitive visualization capabilities

The solution has two intuitive visualization capabilities:

  • COVID-19 monitor – A public access dashboard with insights on historical and future predictions of trajectories of COVID-19 incidences and hospital capacity. These insights are available at the state level and further and allow you to drill into individual counties. The individual county-level view allows you to not only understand the severity of COVID-19 in that area, but also better understand how that county compares to other counties within the same state and observe what policies their local governments have set for the shutdown and reopening process.
  • Treatment finder: A second public access dashboard with near-real-time insights into individual hospital and physician group availability to treat patients for prominent non-COVID-19 diseases. This dashboard allows you to select a specific non-COVID-19 disease and identify the estimated number of COIVD-19-infected people in their geography with the disease, mortality rates, and the individual providers that are accepting patients with a specific disease and health insurance.

Following image represents Intelligence Engine screen with COVID-19 insights for a selected county:

Following image represents Intelligence engine screen that allows patients to find Hospitals / Physician offices that are open & accepting patients:

Conclusion

At its core, the ZS Intelligence Engine is a real-time planning tool. The rich set of AWS services and technologies make it possible to ingest data from various third-party sources—public and proprietary sources alike. AWS services used to build the architecture can run on open technologies. For example, building the the data lake would not have been possible  without Amazon EMR and Amazon EC2. ZS had already been using Apache spark-based EMR instances—the service behind the REVOTM tool—prior to COVID-19 hitting us. ZS can run its ML models cost-effectively by using EC2 On-Demand Instances. Finally, using Amazon Redshift as a data warehouse solution allows ZS to provide COVID-19 analytical insights efficiently and cost-effectively.

Since the project went live, ZS has catered this product to at least six customers in pharma, biotech, and medical device spaces. They are using this product in a variety of ways, including but not limited to:

  • Refining the forecast relating the COVID-19 trajectory to estimate demand for their products
  • Assessing the level of openness of healthcare facilities to understand where patients across therapy areas are being treated
  • Determining which patients and communities to support, because COVID-19 impacts attitudes and concerns regarding immunity and drug use, and greater unemployment means more reimbursement support requirements
  • Readying the education and engagement field force for a mix of in-person and virtual interactions
  • Preparing the supply chain to ensure continuity of care

To try out the analysis yourself, see ZS’s COVID-19 Intelligence Engine.


About the  Authors

Saunak is a Sr. Solutions Architect with AWS helping customers and partners build data warehouse and scalable data platform on AWS.

 

 

Parijat is the current lead of strategy and transformation at ZS. He focuses on mid to small clients that are ready for a transformational process to commercialize new products/portfolio, purchase/sell assets or expand into new markets.

 

 

Wenhao has over 10 years of experience in various data science and advanced analytics field. During his time at ZS, he has helped both to build and popularize data science capabilities across many organizations.

 

 

Vineeth works with Pharmaceutical & Biotech manufacturers on a  broad-spectrum of Commercial issues including Commercial Analytics, Organized provider Strategy & Resource Planning & Deployment.

 

 

Arianna is a Strategy, Insights and Planning Consultant in ZS’ High Tech practice. Arianna has extensive experience in working with clients across industries with go to market strategy and commercial effectiveness issues.

 

 

Gopi Vikranth is an Associate Principal in ZS’ High Tech Practice. He has extensive experience in helping clients across Retail, HiTech, Hospitality, Pharmaceutical & Insurance sectors leverage BigData & Analytics to drive Topline growth.