All posts by Adir Sharabi

Boosting your data lake insights using the Amazon Athena Query Federation SDK

Post Syndicated from Adir Sharabi original https://aws.amazon.com/blogs/big-data/boosting-your-data-lake-insights-using-the-amazon-athena-query-federation-sdk/

Today’s modern applications use multiple purpose-built database engines, including relational, key-value, document, and in-memory databases. This purpose-built approach improves the way applications use data by providing better performance and reducing cost. However, the approach raises some challenges for data teams that need to provide a holistic view on top of these database engines, and especially when they need to merge the data with datasets in the organization’s data lake.

In this post, we show how to use the Amazon Athena Query Federation SDK to easily enrich your data in Amazon Simple Storage Service (Amazon S3) with data from external datastores, apply complex transformations, and get predictive insights by inferencing machine learning (ML) models.

We start by running a query to enrich an S3 backed table that holds features extracted from breast cancer images with fictional patient personal information stored in Amazon DynamoDB. We then use the Athena UDF functionality to decrypt sensitive information stored in the table. Next, we select these features and use Athena integration with Amazon SageMaker to pass them to a linear learner model to predict whether breast cancer is present. Lastly, we show an Amazon QuickSight dashboard to visualize the results.

For this post, we use the following resources:

  • A dataset of features computed from a digitized image of a fine needle aspirate of a breast mass. Such features include radius, texture, perimeter, area, and smoothness. The dataset is stored as a CSV file in Amazon S3.
  • Patient’s personal information (such as age range, email, and country) stored in DynamoDB.
  • A linear learner model deployed into a SageMaker endpoint to predict breast cancer. For more information, see Call an Amazon SageMaker model endpoint using Amazon API Gateway and AWS Lambda.

Prerequisites

Athena Query Federation is now generally available in US East (Ohio), US East (N. Virginia), and US West (Oregon). To use this feature, upgrade your engine version to Athena engine version 2 in your workgroup settings. To enable this feature in other Regions, you need to create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup. Workgroups allows us to:

  • Isolate users, teams, applications, or workloads into groups
  • Enforce costs constraints per query or workgroup
  • Track query-related metrics for all workgroup queries in Amazon CloudWatch

For more information, see Managing Workgroups.

Creating a DynamoDB table and SageMaker endpoint

For the post, we create a new DynamoDB table with synthetic patient data. For the inference requests, we create a new SageMaker endpoint.

  1. Deploy the following AWS CloudFormation stack in us-east-1:

The stack performs the following:

  • Creates a DynamoDB table
  • Creates and triggers an AWS Lambda function to load the table with data
  • Creates a SageMaker endpoint for inference requests

It can take up to 10 minutes for the CloudFormation stack to create the resources.

  1. After the resource creation is complete, navigate to the AWS CloudFormation console.
  2. Choose the boost-your-datalake-insights stack
  3. On the Outputs tab, copy the values for DynamoTableName and SMEndpointName. You use these values later in the post.

Downloading the dataset

Under new or existing bucket create a folder named breast_cancer_features. Download the breast_cancer_raw_data.csv file and upload it to breast_cancer_features folder in your bucket. This file contains the Breast Cancer Wisconsin (Diagnostic) Data Set, available at https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Diagnostic%29.

The file holds 570 records with 30 columns of values computed for each cell nucleus. Such features include radius, texture, perimeter, area, smoothness, compactness, concavity, concave points, symmetry, and fractal dimension. The following screenshot displays a few records from the file.

Creating the features table in Athena

To query this dataset from Athena, you need to create an external table that points to that file. There are several ways to create table in Athena. For this post, we use a Hive data definition language (DDL) statement.

  1. On the Athena console, if using a non-GA Region, switch to the AmazonAthenaPreviewFunctionality workgroup created earlier.
  2. Enter the following statement:
    CREATE EXTERNAL TABLE breast_cancer_features(
    id string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://<bucket>/breast_cancer_features/'
    TBLPROPERTIES (
      'classification'='csv', 
      'columnsOrdered'='true', 
      'compressionType'='none', 
      'delimiter'=',', 
      'skip.header.line.count'='1')

  1. Choose Run Query or press CTRL + Enter.

After the table is created successfully, we can run queries such as the following, to profile your data and better understand how it’s distributed:

SELECT variance(radius_mean) AS variance,
        stddev(radius_mean) AS stddev ,
        min(radius_mean) AS min,
         max(radius_mean) AS max,
         avg(radius_mean) AS avg
FROM "default"."breast_cancer_features"

The following screenshot shows our results.

Joining the features table with data stored in DynamoDB

In this step, we enrich the features table by joining it with the personal information stored in DynamoDB. With Athena Query Federation, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources.

The following screenshot displays a few records from the DynamoDB table.

To join the features table stored in Amazon S3 with the personal data stored in DynamoDB, we need to create a data source connector that runs on Lambda to run the federated query. A data source connector is a piece of code that can translate between your target data source and Athena. You can think of a connector as an extension of the query engine in Athena.

Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, DynamoDB, Amazon DocumentDB (with MongoDB capability), and Amazon Relational Database Service (Amazon RDS), and JDBC-compliant relational data sources such as MySQL and PostgreSQL. You can also use the Athena Query Federation SDK to write custom connectors.

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source.

Deploying a Lambda function data source connector

To deploy the data source connector, complete the following steps

  1. On the Athena console, choose Data sources.
  2. Choose Connect data source.
  3. Choose Query a data source.
  4. For Choose a data source, choose the data source that you want to query with Athena, such as Amazon DynamoDB.
  5. Choose Next.
  6. For Lambda function, choose Configure new AWS Lambda function.

The function page for the connector that you chose opens on the Lambda console. The page includes detailed information about the connector.

  1. Under Application settings, specify the required information. At a minimum, this includes:
    • AthenaCatalogName – A name for the Lambda function that indicates the data source that it targets, such as athena_dynamodb_connector.
    • SpillBucket – An S3 bucket in your account to store data that exceeds Lambda function response size limits.

For more information about the remaining configurable options, see Available Connectors on GitHub.

  1. Select I acknowledge that this app creates custom IAM roles.
  2. Choose Deploy.

The Resources section on the Lambda console shows the deployment status of the connector and informs you when the deployment is complete.

Connecting to a data source

After you deploy the data source connector to your account, you can connect it to a data source from Athena.

  1. On the Athena console, choose Connect data source.
  2. Choose Query a data source.
  3. Choose the data source for the connector that you just deployed, such as Amazon DynamoDB. If you used the Athena Query Federation SDK to create your own connector and have deployed it to your account, choose All other data sources.
  4. Choose Next.
  5. For Choose Lambda function, choose the function that you named previously (athena_dynamodb_connector).
  6. For Catalog name, enter a unique name to use for the data source in your SQL queries, such as dynamo_db.

The name can be up to 127 characters and must be unique within your account. It can’t be changed after creation. Valid characters are a–z, A–Z, 0–9, _, @, and -. The names awsdatacatalog, hive, jmx, and system are reserved by Athena and can’t be used for custom catalog names.

  1. Choose Connect.

The Data sources page shows your connector in the list of catalog names. You can now use the connector in your queries.

Querying the external data source

To query your external data source, complete the following steps:

  1. In the Athena Query editor, for Data source, choose dynamo_db.

The DynamoDB table appears in the Tables list.

  1. Choose (three dots) and choose Preview table.

Now we can run queries like the following to enrich and get more insights on our data by joining it with additional data that was stored in DynamoDB:

SELECT age, count(*)
FROM breast_cancer_features d
JOIN "dynamo_db"."default"."<DynamoTableName>" p
ON d.id = p.patient_id
GROUP BY age

The following screenshot shows our results.

Using custom UDFs to decrypt the patient’s email

When looking at our patient’s personal information stored in the DynamoDB table, we can see that the patient’s email is encrypted. We want to allow our users to get the decrypted email while querying with Athena without needing to run custom ETL, which requires us to store it decrypted.

User Defined Functions (UDFs) in Athena allow you to create custom functions to process records or groups of records. A UDF accepts parameters, performs the work, and returns a result. However, UDFs in Athena have the following limitations:

  • Scalar UDFs only – Athena only supports scalar UDFs, which process one row at a time and return a single column value. Athena passes a batch of rows to the UDF each time it invokes a Lambda function.
  • Java runtime only – As of this writing, Athena UDFs support only the Java 8 runtime for Lambda.

To use this feature in preview, you must create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup, as specified in prerequisites section. For more information, see Querying with User Defined Functions.

Deploying a custom UDF

In this post, we use the decrypt method from the example UDF we published. The same encryption key that we used for the encryption should also be used for the decryption. We store that string in AWS Secrets Manager and use the Athena Query Federation SDKs to retrieve the stored key when the function is called.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Select secret type, select Other type of secrets.
  3. Choose Plaintext.
  4. Remove all the JSON brackets and enter a base64 encoded string as data key, such as AQIDBAUGBwgJAAECAwQFBg==
  5. For Select the encryption key, choose DefaultEncryptionKey.
  6. Choose Next.

  1. Enter athena_encrypt_udf_key as the secret name.
  2. Choose Next.
  3. Chose Next again.
  4. Chose Store.

Next, we deploy the Lambda function that runs the UDF.

  1. On the AWS Serverless Application Repository console, in the navigation pane, choose Available applications.
  2. Select Show apps that create custom IAM roles or resource policies.
  3. In the search box, enter AthenaUserDefinedFunctions.
  4. Choose the application from the result pane.

The Lambda function’s Application details page opens on the Lambda console.

  1. For SecretNameOrPrefix, enter the name or prefix of a set of names within Secrets Manager that this function should have access to, such as athena_encrypt_udf_key* (make sure to include an asterisk at the end).
  2. For LambdaFunctionName, enter the name of the Lambda function that runs your UDFs, such as athena_udf.
  3. Select I acknowledge that this app creates custom IAM roles.
  4. Choose Deploy.

The Resources section of the Lambda console shows the deployment status of the connector and informs you when the deployment is complete.

Querying with UDFs

The USING FUNCTION clause specifies a UDF or multiple UDFs that can be referenced by a subsequent SELECT statement in the query. You need the method name for the UDF and the name of the Lambda function that hosts the UDF.

In our example, the decrypt method gets two parameters: encrypted_col and secretName. We define the function and run the following query:

USING FUNCTION decrypt(encrypted_col VARCHAR, secretName VARCHAR)
RETURNS VARCHAR
TYPE LAMBDA_INVOKE WITH (lambda_name ='athena_udf')

SELECT encrypted_email, 
    decrypt(encrypted_email,'athena_encrypt_udf_key') AS decrypted_email
FROM "dynamo_db"."default"."<DynamoTableName>"

The following screenshot shows the query results.

Using Athena ML to predict breast cancer

To get predictive insights from our data using ML models, we need to write a code to enable inference against a deployed model. Data analysts are often skilled in using SQL, but may not have expertise in programming. ML with Athena bridges this gap and lets you run inference on models deployed on SageMaker by writing SQL statements in Athena. This feature simplifies access for data analysis to ML models such as anomaly detection, customer cohort analysis, and sales predictions, and improves productivity. This eliminates the need to use complex programming methods or offload data and jobs orchestration to run inference. 

SageMaker is a fully managed ML service, where data scientists and developers can quickly and easily build and train ML models. In addition, SageMaker allows you to deploy your model to get predictions in one of two ways:

Running a SQL statement with SageMaker inference to predict breast cancer

To perform inference from Athena, you need to train the model based on historical labeled data and deploy it into the SageMaker hosting service. In this post, we use a linear learner model to predict breast cancer out of features we used before. The model was already deployed as part of the CloudFormation stack as you can see in the screenshot below from the SageMaker console.

To use ML with Athena, you define a function with the USING FUNCTION clause. The function points to the SageMaker model endpoint that you want to use and specifies the variable names and data types to pass to the model. Subsequent clauses in the query reference the function to pass values to the model. The model runs inference based on the values that the query passes and then returns inference results.

In our example, the model endpoint is SMEndpoint-odPIqmf9LjUh and the variables are the feature columns from the breast_cancer_features table. We define the function and run the following query:

USING FUNCTION predict_breast_cancer(radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) 
RETURNS DOUBLE 
TYPE SAGEMAKER_INVOKE_ENDPOINT WITH (sagemaker_endpoint = '<SMEndpointName>')

SELECT id, prediction, CASE WHEN round(prediction)=1 THEN 'B' ELSE 'M' END AS diagnosis
FROM( SELECT id, predict_breast_cancer(radius_mean, texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave_points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave_points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave_points_worst, symmetry_worst, fractal_dimension_worst) AS prediction FROM breast_cancer_features)a

In the following query results, you can see the prediction column returned by the model. The diagnosis column that derives from it is either B for benign or M for malignant.

Visualizing the data using QuickSight

Because many decision-makers aren’t familiar with SQL syntax, they want to consume the data in more graphic way, such as through charts and dashboards. Visualizing the data is also required by data scientists to identify trends and anomalies, and understand how our data behaves.

Before we visualize our data with QuickSight, in order to get the best performances, we create a new dataset in Amazon S3 that holds all the patient’s personal information joined with the breast cancer diagnostic we got from the ML model inference. We use Athena’s CREATE TABLE AS SELECT (CTAS) statement to create the table and populate it with the joint data:

CREATE TABLE breast_cancer_final
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://<bucket>/breast_cancer_final/')
AS
USING FUNCTION predict_breast_cancer(radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) 
RETURNS DOUBLE 
TYPE SAGEMAKER_INVOKE_ENDPOINT WITH (sagemaker_endpoint = '<SMEndpointName>')

SELECT *, CASE WHEN round(prediction)=1 THEN 'B' ELSE 'M' END AS diagnosis
FROM( SELECT id, p.*, predict_breast_cancer(radius_mean, texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave_points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave_points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave_points_worst, symmetry_worst, fractal_dimension_worst) AS prediction FROM breast_cancer_features d JOIN "dynamo_db"."default"."<DynamoTableName>" p
ON d.id = p.patient_id)a

When the query successfully finishes, we can start building our dashboards using QuickSight.

First, we create a dataset in QuickSight for our table in Athena. For instructions, see Creating a Dataset Using Amazon Athena Data.

Next, we create QuickSight visuals.

The following dashboard shows the distribution of age range using a pie visual, a patient count by diagnostic per week, the top five countries, and patient distribution over time with forecast enabled.

Clean up

Now to the final step, cleaning up the resources.

To avoid unnecessary charges on your AWS account, do the following:

  1. Destroy all of the resources created by the CloudFormation stack in create a DynamoDB table and SageMaker endpoint set up by deleting the stack after you’re done experimenting with it. You can follow the steps here to delete the stack.
  2. You have to manually delete the S3 bucket you created with the data uploaded and generated with Athena.

Conclusions

This post demonstrated how the Athena Query Federation SDK allows you to implement serverless ETL to get more out of your data in your Amazon S3 data lake. We showed how simple Athena SQL syntax allows you to enrich your data with an external datastore, perform business logic using custom UDFs, and get insights by running ML inference. We also visualized our enriched dataset by creating a dashboard in QuickSight.

If you have feedback about this post, please share it in the comments. If you have questions about implementing the solution used in this post, comment or open a thread on the Developer Tools forum.


About the Authors

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

Eitan Sela is a Solutions Architect with Amazon Web Services. He works with AWS customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS. Eitan also helps customers build and operate machine learning solutions on AWS. In his spare time, Eitan enjoys jogging and reading the latest machine learning articles.