Tag Archives: Amazon Athena

Monitoring and management with Amazon QuickSight and Athena in your CI/CD pipeline

Post Syndicated from Umair Nawaz original https://aws.amazon.com/blogs/devops/monitoring-and-management-with-amazon-quicksight-and-athena-in-your-ci-cd-pipeline/

One of the many ways to monitor and manage required CI/CD metrics is to use Amazon QuickSight to build customized visualizations. Additionally, by applying Lean management to software delivery processes, organizations can improve delivery of features faster, pivot when needed, respond to compliance and security changes, and take advantage of instant feedback to improve the customer delivery experience. This blog post demonstrates how AWS resources and tools can provide monitoring and information pertaining to their CI/CD pipelines.

There are three principles in Lean management that this artifact enables and to which it contributes:

  • Limiting work in progress by establishing constraints that drive process improvement and increase throughput.
  • Creating and maintaining dashboards displaying key quality information, productivity metrics, and current status of work (including defects).
  • Using data from development performance and operations monitoring tools to enable business decisions more frequently.

Overview

The following architectural diagram shows how to use AWS services to collect metrics from a CI/CD pipeline and deliver insights through Amazon QuickSight dashboards.

Architecture diagram showing an overview of how CI/CD metrics are extracted and transformed to create a dynamic QuickSight dashboard

In this example, the orchestrator for the CI/CD pipeline is AWS CodePipeline with the entry point as an AWS CodeCommit Git repository for source control. When a developer pushes a code change into the CodeCommit repository, the change goes through a series of phases in CodePipeline. AWS CodeBuild is responsible for performing build actions and, upon successful completion of this phase, AWS CodeDeploy kicks off the actions to execute the deployment.

For each action in CodePipeline, the following series of events occurs:

  • An Amazon CloudWatch rule creates a CloudWatch event containing the action’s metadata.
  • The CloudWatch event triggers an AWS Lambda function.
  • The Lambda function extracts relevant reporting data and writes it to a CSV file in an Amazon S3 bucket.
  • Amazon Athena queries the Amazon S3 bucket and loads the query results into SPICE (an in-memory engine for Amazon QuickSight).
  • Amazon QuickSight obtains data from SPICE to build dashboard displays for the management team.

Note: This solution is for an AWS account with an existing CodePipeline(s). If you do not have a CodePipeline, no metrics will be collected.

Getting started

To get started, follow these steps:

  • Create a Lambda function and copy the following code snippet. Be sure to replace the bucket name with the one used to store your event data. This Lambda function takes the payload from a CloudWatch event and extracts the field’s pipeline, time, state, execution, stage, and action to transform into a CSV file.

Note: Athena’s performance can be improved by compressing, partitioning, or converting data into columnar formats such as Apache Parquet. In this use-case, the dataset size is negligible therefore, a transformation from CSV to Parquet is not required.


import boto3
import csv
import datetime
import os

 # Analyze payload from CloudWatch Event
 def pipeline_execution(data):
     print (data)
     # Specify data fields to deliver to S3
     row=['pipeline,time,state,execution,stage,action']
     
     if "stage" in data['detail'].keys():
         stage=data['detail']['execution']
     else:
         stage='NA'
         
     if "action" in data['detail'].keys():
         action=data['detail']['action']
     else:
         action='NA'
     row.append(data['detail']['pipeline']+','+data['time']+','+data['detail']['state']+','+data['detail']['execution']+','+stage+','+action)  
     values = '\n'.join(str(v) for v in row)
     return values

 # Upload CSV file to S3 bucket
 def upload_data_to_s3(data):
     s3=boto3.client('s3')
     runDate = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S:%f")
     csv_key=runDate+'.csv'
     response = s3.put_object(
         Body=data,
         Bucket='*<example-bucket>*',
         Key=csv_key
     )

 def lambda_handler(event, context):
     upload_data_to_s3(pipeline_execution(event))
  • Create an Athena table to query the data stored in the Amazon S3 bucket. Execute the following SQL in the Athena query console and provide the bucket name that will hold the data.
CREATE EXTERNAL TABLE `devops`(
   `pipeline` string, 
   `time` string, 
   `state` string, 
   `execution` string, 
   `stage` string, 
   `action` string)
 ROW FORMAT DELIMITED 
   FIELDS TERMINATED BY ',' 
 STORED AS INPUTFORMAT 
   'org.apache.hadoop.mapred.TextInputFormat' 
 OUTPUTFORMAT 
   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
 LOCATION
   's3://**<example-bucket>**/'
 TBLPROPERTIES (
   'areColumnsQuoted'='false', 
   'classification'='csv', 
   'columnsOrdered'='true', 
   'compressionType'='none', 
   'delimiter'=',', 
   'skip.header.line.count'='1',  
   'typeOfData'='file')  
  • Create a CloudWatch event rule that passes events to the Lambda function created in Step 1. In the event rule configuration, set the Service Name as CodePipeline and, for Event Type, select All Events.

Sample Dataset view from Athena.

Sample Athena query and the results

Amazon QuickSight visuals

After the initial setup is done, you are ready to create your QuickSight dashboard. Be sure to check that the Athena permissions are properly set before creating an analysis to be published as an Amazon QuickSight dashboard.

Below are diagrams and figures from Amazon QuickSight that can be generated using the event data queried from Athena. In this example, you can see how many executions happened in the account and how many were successful.

The following screenshot shows that most pipeline executions are failing. A manager might be concerned that this points to a significant issue and prompt an investigation in which they can allocate resources to improve delivery and efficiency.

QuickSight Dashboard showing total execution successes and failures

The visual for this solution is dynamic in nature. In case the pipeline has more or fewer actions, the visual will adjust automatically to reflect all actions. After looking at the success and failure rates for each CodePipeline action in Amazon QuickSight, as shown in the following screenshot, users can take targeted actions quickly. For example, if the team sees a lot of failures due to vulnerability scanning, they can work on improving that problem area to drive value for future code releases.

QuickSight Dashboard showing the successes and failures of pipeline actions

Day-over-day visuals reflect date-specific activity and enable teams to see their progress over a period of time.

QuickSight Dashboard showing day over day results of successful CI/CD executions and failures

Amazon QuickSight offers controls that can be configured to apply filters to visuals. For example, the following screenshot demonstrates how users can toggle between visuals for different applications.

QuickSight's control function to switch between different visualization options

Cleanup (optional)

In order to avoid unintended charges, delete the following resources:

  • Amazon CloudWatch event rule
  • Lambda function
  • Amazon S3 Bucket (the location in which CSV files generated by the Lambda function are stored)
  • Athena external table
  • Amazon QuickSight data sets
  • Analysis and dashboard

Conclusion

In this blog, we showed how metrics can be derived from a CI/CD pipeline. Utilizing Amazon QuickSight to create visuals from these metrics allows teams to continuously deliver updates on the deployment process to management. The aggregation of the captured data over time allows individual developers and teams to improve their processes. That is the goal of creating a Lean DevOps process: to oversee the meta-delivery pipeline and optimize all future releases by identifying weak spots and points of risk during the entire release process.

___________________________________________________________

About the Authors

Umair Nawaz is a DevOps Engineer at Amazon Web Services in New York City. He works on building secure architectures and advises enterprises on agile software delivery. He is motivated to solve problems strategically by utilizing modern technologies.
Christopher Flores is an Engagement Manager at Amazon Web Services in New York City. He leads AWS developers, partners, and client teams in using the customer engagement accelerator framework. Christopher expedites stakeholder alignment, enterprise cohesion and risk mitigation while ensuring feedback loops to close the engagement lifecycle.
Carol Liao is a Cloud Infrastructure Architect at Amazon Web Services in New York City. She enjoys designing and developing modern IT solutions in the cloud where there is always more to learn, more problems to solve, and more to build.

 

How Siemens built a fully managed scheduling mechanism for updates on Amazon S3 data lakes

Post Syndicated from Pedro Bento original https://aws.amazon.com/blogs/big-data/how-siemens-built-a-fully-managed-scheduling-mechanism-for-consistent-updates-on-amazon-s3-data-lakes/

Siemens is a global technology leader with more than 370,000 employees and 170 years of experience. To protect Siemens from cybercrime, the Siemens Cyber Defense Center (CDC) continuously monitors Siemens’ networks and assets. To handle the resulting enormous data load, the CDC built a next-generation threat detection and analysis platform called ARGOS. ARGOS is a hybrid-cloud solution that makes heavy use of fully managed AWS services for streaming, big data processing, and machine learning.

Users such as security analysts, data scientists, threat intelligence teams, and incident handlers continuously access data in the ARGOS platform. Further, various automated components update, extend, and remove data to enrich information, improve data quality, enforce PII requirements, or mutate data due to schema evolution or additional data normalization requirements. Keeping the data always available and consistent presents multiple challenges.

While object-based data lakes are highly beneficial from a cost perspective compared to traditional transactional databases in such scenarios, they hardly allow for atomic updates or require highly complex and costly extensions. To overcome this problem, Siemens designed a solution that enables atomic file updates on Amazon S3-based data lakes without compromising query performance and availability.

This post presents this solution, which is an easy-to-use scheduling service for S3 data update tasks. Siemens uses it for multiple purposes, including pseudonymization, anonymization, and removal of sensitive data. This post demonstrates how to use the solution to remove values from a dataset after a predefined amount of time. Adding further data processing tasks is straightforward because the solution has a well-defined architecture and the whole stack consists of fewer than 200 lines of source code. It is solely based on fully managed AWS services and therefore achieves minimal operational overhead.

Architecture overview

This post uses an S3-based data lake with continuous data ingestion and Amazon Athena as query mechanism. The goal is to remove certain values after a predefined time automatically after ingestion. Applications and users consuming the data via Athena are not impacted (for example, they do not observe downtimes or data quality issues like duplication).

The following diagram illustrates the architecture of this solution.

Siemens built the solution with the following services and components:

  1. Scheduling trigger – New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
  2. Task scheduling – As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
  3. Task execution trigger – When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
  4. Task execution – The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
  5. Data usage – The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.

About DynamoDB Streams and TTL

TTL for DynamoDB lets you define when items in a table expire so they can be deleted from the database automatically. TTL comes at no extra cost as a way to reduce storage use and reduce the cost of storing irrelevant data without using provisioned throughput. You can set a timestamp for deletion on a per-item basis, which allows you to limit storage usage to only those records that are relevant, by enabling TTL on a table.

Solution overview

To implement this solution manually, complete the following steps:

  1. Create a DynamoDB table and configure DynamoDB Streams.
  2. Create a Lambda function to insert TTL records.
  3. Configure an S3 event notification on the target bucket.
  4. Create a Lambda function that performs data processing tasks.
  5. Use Athena to query the processed data.

If you want to deploy the solution automatically, you may skip these steps, and use the AWS Cloudformation template provided.

Prerequisites

To complete this walkthrough, you must have the following:

  • An AWS account with access to the AWS Management Console.
  • A role with access to S3, DynamoDB, Lambda, and Athena.

Creating a DynamoDB table and configuring DynamoDB Streams

Start first with the time-based trigger setup. For this, you use S3 notifications, DynamoDB Streams, and a Lambda function to integrate both services. The DynamoDB table stores the items to process after a predefined time.

Complete the following steps:

  1. On the DynamoDB console, create a table.
  2. For Table name, enter objects-to-process.
  3. For Primary key, enter path and choose String.
  4. Select the table and click on Manage TTL next to “Time to live attribute” under table details.
  5. For TTL attribute, enter ttl.
  6. For DynamoDB Streams, choose Enable with view type New and old images.

Note that you can enable DynamoDB TTL on non-numeric attributes, but it only works on numeric attributes.

The DynamoDB TTL is not minute-precise. Expired items are typically deleted within 48 hours of expiration. However, you may experience shorter deviations of only 10–30 minutes from the actual TTL value. For more information, see Time to Live: How It Works.

Creating a Lambda function to insert TTL records

The first Lambda function you create is for scheduling tasks. It receives a S3 notification as input, recreates the S3 path (for example, s3://<bucket>/<key>), and creates a new item on DynamoDB with two attributes: the S3 path and the TTL (in seconds). For more information about a similar S3 notification event structure, see Test the Lambda Function.

To deploy the Lambda function, on the Lambda console, create a function named NotificationFunction with the Python 3.7 runtime and the following code:

import boto3, os, time

# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300

s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')

def parse_bucket_and_key(s3_notif_event):
    s3_record = s3_notif_event['Records'][0]['s3']
    return s3_record['bucket']['name'], s3_record['object']['key']

def lambda_handler(event, context):
    try:
        bucket_name, key = parse_bucket_and_key(event)
        head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
        tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
        if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
            record_path = f"s3://{bucket_name}/{key}"
            table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
    except:
        pass # Ignore

Configuring S3 event notifications on the target bucket

You can take advantage of the scalability, security, and performance of S3 by using it as a data lake for storing your datasets. Additionally, you can use S3 event notifications to capture S3-related events, such as the creation or deletion of objects within a bucket. You can forward these events to other AWS services, such as Lambda.

To configure S3 event notifications, complete the following steps:

  1. On the S3 console, create an S3 bucket named data-bucket.
  2. Click on the bucket and go to “Properties” tab.
  3. Under Advanced Settings, choose Events and add a notification.
  4. For Name, enter MyEventNotification.
  5. For Events, select All object create events.
  6. For Prefix, enter dataset/.
  7. For Send to, choose Lambda Function.
  8. For Lambda, choose NotificationFunction.

This configuration restricts the scheduling to events that happen within your previously defined dataset. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket?

Creating a Lambda function that performs data processing tasks

You have now created a time-based trigger for the deletion of the record in the DynamoDB table. However, when the system delete occurs and the change is recorded in DynamoDB Streams, no further action is taken. Lambda can poll the stream to detect these change records and trigger a function to process them according to the activity (INSERT, MODIFY, REMOVE).

This post is only concerned with deleted items because it uses the TTL feature of DynamoDB Streams to trigger task executions. Lambda gives you the flexibility to either process the item by itself or to forward the processing effort to somewhere else (such as an AWS Glue job or an Amazon SQS queue).

This post uses Lambda directly to process the S3 objects. The Lambda function performs the following tasks:

  1. Gets the S3 object from the DynamoDB item’s S3 path attribute.
  2. Modifies the object’s data.
  3. Overrides the old S3 object with the updated content and tags the object as processed.

Complete the following steps:

  1. On the Lambda console, create a function named JSONProcessingFunction with Python 3.7 as the runtime and the following code:
    import os, json, boto3
    from functools import partial
    from urllib.parse import urlparse
    
    s3 = boto3.resource('s3')
    
    def parse_bucket_and_key(s3_url_as_string):
        s3_path = urlparse(s3_url_as_string)
        return s3_path.netloc, s3_path.path[1:]
    
    def extract_s3path_from_dynamo_event(event):
        if event["Records"][0]["eventName"] == "REMOVE":
            return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]
    
    def modify_json(json_dict, column_name, value):
        json_dict[column_name] = value
        return json_dict
        
    def get_obj_contents(bucketname, key):
        obj = s3.Object(bucketname, key)
        return obj.get()['Body'].iter_lines()
    
    clean_column_2_func = partial(modify_json, column_name="file_contents", value="")
    
    def lambda_handler(event, context):
        s3_url_as_string = extract_s3path_from_dynamo_event(event)
        if s3_url_as_string:
            bucket_name, key = parse_bucket_and_key(s3_url_as_string)
            updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
            s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
        else:
            print(f"Invalid event: {str(event)}")

  2. On the Lambda function configuration webpage, click on Add trigger.
  3. For Trigger configuration, choose DynamoDB.
  4. For DynamoDB table, choose objects-to-process.
  5. For Batch size, enter 1.
  6. For Batch window, enter 0.
  7. For Starting position, choose Trim horizon.
  8. Select Enable trigger.

You use batch size = 1 because each S3 object represented on the DynamoDB table is typically large. If these files are small, you can use a larger batch size. The batch size is essentially the number of files that your Lambda function processes at a time.

Because any new objects on S3 (in a versioning-enabled bucket) create an object creation event, even if its key already exists, you must make sure that your task schedule Lambda function ignores any object creation events that your task execution function creates. Otherwise, it creates an infinite loop. This post uses tags on S3 objects: when the task execution function processes an object, it adds a processed tag. The task scheduling function ignores those objects in subsequent executions.

Using Athena to query the processed data

The final step is to create a table for Athena to query the data. You can do this manually or by using an AWS Glue crawler that infers the schema directly from the data and automatically creates the table for you. This post uses a crawler because it can handle schema changes and add new partitions automatically. To create this crawler, use the following code:

aws glue create-crawler --name data-crawler \ 
--role <AWSGlueServiceRole-crawler> \
--database-name data_db \
--description 'crawl data bucket!' \
--targets \
"{\
  \"S3Targets\": [\
    {\
      \"Path\": \"s3://<data-bucket>/dataset/\"\
    }\
  ]\
}"

Replace <AWSGlueServiceRole-crawler> and <data-bucket> with the name of your AWSGlueServiceRole and S3 bucket, respectively.

When the crawling process is complete, you can start querying the data. You can use the Athena console to interact with the table while its underlying data is being transparently updated. See the following code:

SELECT * FROM data_db.dataset LIMIT 1000

Automated setup

You can use the following AWS CloudFormation template to create the solution described on this post on your AWS account. To launch the template, choose the following link:

This CloudFormation stack requires the following parameters:

  • Stack name – A meaningful name for the stack, for example, data-updater-solution.
  • Bucket name – The name of the S3 bucket to use for the solution. The stack creation process creates this bucket.
  • Time to Live – The number of seconds to expire items on the DynamoDB table. Referenced S3 objects are processed on item expiration.

Stack creation takes up to a few minutes. Check and refresh the AWS CloudFormation Resources tab to monitor the process while it is running.

When the stack shows the state CREATE_COMPLETE, you can start using the solution.

Testing the solution

To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents and replaces its value with an empty string.

You can now upload new data into your data-bucket S3 bucket under the dataset/ prefix. Your NotificationFunction Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction Lambda function processes the data and you can check the resulting changes via an Athena query.

You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed tag.

Conclusion

This post showed how to automatically re-process objects on S3 after a predefined amount of time by using a simple and fully managed scheduling mechanism. Because you use S3 for storage, you automatically benefit from S3’s eventual consistency model, simply by using identical keys (names) both for the original and processed objects. This way, you avoid query results with duplicate or missing data. Also, incomplete or only partially uploaded objects do not result in data inconsistencies because S3 only creates new object versions for successfully completed file transfers.

You may have previously used Spark to process objects hourly. This requires you to monitor objects that must be processed, to move and process them in a staging area, and to move them back to their actual destination. The main drawback is the final step because, due to Spark’s parallelism nature, files are generated with different names and contents. That prevents direct file replacement in the dataset and leads to downtimes or potential data duplicates when data is queried during a move operation. Additionally, because each copy/delete operation could potentially fail, you have to deal with possible partially processed data manually.

From an operations perspective, AWS serverless services simplify your infrastructure. You can combine the scalability of these services with a pay-as-you-go plan to start with a low-cost POC and scale to production quickly—all with a minimal code base.

Compared to hourly Spark jobs, you could potentially reduce costs by up to 80%, which makes this solution both cheaper and simpler.

Special thanks to Karl Fuchs, Stefan Schmidt, Carlos Rodrigues, João Neves, Eduardo Dixo and Marco Henriques for their valuable feedback on this post’s content.

 


About the Authors

Pedro Completo Bento is a senior big data engineer working at Siemens CDC. He holds a Master in Computer Science from the Instituto Superior Técnico in Lisbon. He started his career as a full-stack developer, specializing later on big data challenges. Working with AWS, he builds highly reliable, performant and scalable systems on the cloud, while keeping the costs at bay. In his free time, he enjoys to play boardgames with his friends.

 

 

Arturo Bayo is a big data consultant at Amazon Web Services. He promotes a data-driven culture in enterprise customers around EMEA, providing specialized guidance on business intelligence and data lake projects while working with AWS customers and partners to build innovative solutions around data and analytics.

 

 

 

 

ICYMI: Serverless Q4 2019

Post Syndicated from Rob Sutter original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2019/

Welcome to the eighth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

The three months comprising the fourth quarter of 2019

AWS re:Invent

AWS re:Invent 2019

re:Invent 2019 dominated the fourth quarter at AWS. The serverless team presented a number of talks, workshops, and builder sessions to help customers increase their skills and deliver value more rapidly to their own customers.

Serverless talks from re:Invent 2019

Chris Munns presenting 'Building microservices with AWS Lambda' at re:Invent 2019

We presented dozens of sessions showing how customers can improve their architecture and agility with serverless. Here are some of the most popular.

Videos

Decks

You can also find decks for many of the serverless presentations and other re:Invent presentations on our AWS Events Content.

AWS Lambda

For developers needing greater control over performance of their serverless applications at any scale, AWS Lambda announced Provisioned Concurrency at re:Invent. This feature enables Lambda functions to execute with consistent start-up latency making them ideal for building latency sensitive applications.

As shown in the below graph, provisioned concurrency reduces tail latency, directly impacting response times and providing a more responsive end user experience.

Graph showing performance enhancements with AWS Lambda Provisioned Concurrency

Lambda rolled out enhanced VPC networking to 14 additional Regions around the world. This change brings dramatic improvements to startup performance for Lambda functions running in VPCs due to more efficient usage of elastic network interfaces.

Illustration of AWS Lambda VPC to VPC NAT

New VPC to VPC NAT for Lambda functions

Lambda now supports three additional runtimes: Node.js 12, Java 11, and Python 3.8. Each of these new runtimes has new version-specific features and benefits, which are covered in the linked release posts. Like the Node.js 10 runtime, these new runtimes are all based on an Amazon Linux 2 execution environment.

Lambda released a number of controls for both stream and async-based invocations:

  • You can now configure error handling for Lambda functions consuming events from Amazon Kinesis Data Streams or Amazon DynamoDB Streams. It’s now possible to limit the retry count, limit the age of records being retried, configure a failure destination, or split a batch to isolate a problem record. These capabilities help you deal with potential “poison pill” records that would previously cause streams to pause in processing.
  • For asynchronous Lambda invocations, you can now set the maximum event age and retry attempts on the event. If either configured condition is met, the event can be routed to a dead letter queue (DLQ), Lambda destination, or it can be discarded.

AWS Lambda Destinations is a new feature that allows developers to designate an asynchronous target for Lambda function invocation results. You can set separate destinations for success and failure. This unlocks new patterns for distributed event-based applications and can replace custom code previously used to manage routing results.

Illustration depicting AWS Lambda Destinations with success and failure configurations

Lambda Destinations

Lambda also now supports setting a Parallelization Factor, which allows you to set multiple Lambda invocations per shard for Kinesis Data Streams and DynamoDB Streams. This enables faster processing without the need to increase your shard count, while still guaranteeing the order of records processed.

Illustration of multiple AWS Lambda invocations per Kinesis Data Streams shard

Lambda Parallelization Factor diagram

Lambda introduced Amazon SQS FIFO queues as an event source. “First in, first out” (FIFO) queues guarantee the order of record processing, unlike standard queues. FIFO queues support messaging batching via a MessageGroupID attribute that supports parallel Lambda consumers of a single FIFO queue, enabling high throughput of record processing by Lambda.

Lambda now supports Environment Variables in the AWS China (Beijing) Region and the AWS China (Ningxia) Region.

You can now view percentile statistics for the duration metric of your Lambda functions. Percentile statistics show the relative standing of a value in a dataset, and are useful when applied to metrics that exhibit large variances. They can help you understand the distribution of a metric, discover outliers, and find hard-to-spot situations that affect customer experience for a subset of your users.

Amazon API Gateway

Screen capture of creating an Amazon API Gateway HTTP API in the AWS Management Console

Amazon API Gateway announced the preview of HTTP APIs. In addition to significant performance improvements, most customers see an average cost savings of 70% when compared with API Gateway REST APIs. With HTTP APIs, you can create an API in four simple steps. Once the API is created, additional configuration for CORS and JWT authorizers can be added.

AWS SAM CLI

Screen capture of the new 'sam deploy' process in a terminal window

The AWS SAM CLI team simplified the bucket management and deployment process in the SAM CLI. You no longer need to manage a bucket for deployment artifacts – SAM CLI handles this for you. The deployment process has also been streamlined from multiple flagged commands to a single command, sam deploy.

AWS Step Functions

One powerful feature of AWS Step Functions is its ability to integrate directly with AWS services without you needing to write complicated application code. In Q4, Step Functions expanded its integration with Amazon SageMaker to simplify machine learning workflows. Step Functions also added a new integration with Amazon EMR, making EMR big data processing workflows faster to build and easier to monitor.

Screen capture of an AWS Step Functions step with Amazon EMR

Step Functions step with EMR

Step Functions now provides the ability to track state transition usage by integrating with AWS Budgets, allowing you to monitor trends and react to usage on your AWS account.

You can now view CloudWatch Metrics for Step Functions at a one-minute frequency. This makes it easier to set up detailed monitoring for your workflows. You can use one-minute metrics to set up CloudWatch Alarms based on your Step Functions API usage, Lambda functions, service integrations, and execution details.

Step Functions now supports higher throughput workflows, making it easier to coordinate applications with high event rates. This increases the limits to 1,500 state transitions per second and a default start rate of 300 state machine executions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). Click the above link to learn more about the limit increases in other Regions.

Screen capture of choosing Express Workflows in the AWS Management Console

Step Functions released AWS Step Functions Express Workflows. With the ability to support event rates greater than 100,000 per second, this feature is designed for high-performance workloads at a reduced cost.

Amazon EventBridge

Illustration of the Amazon EventBridge schema registry and discovery service

Amazon EventBridge announced the preview of the Amazon EventBridge schema registry and discovery service. This service allows developers to automate discovery and cataloging event schemas for use in their applications. Additionally, once a schema is stored in the registry, you can generate and download a code binding that represents the schema as an object in your code.

Amazon SNS

Amazon SNS now supports the use of dead letter queues (DLQ) to help capture unhandled events. By enabling a DLQ, you can catch events that are not processed and re-submit them or analyze to locate processing issues.

Amazon CloudWatch

Amazon CloudWatch announced Amazon CloudWatch ServiceLens to provide a “single pane of glass” to observe health, performance, and availability of your application.

Screenshot of Amazon CloudWatch ServiceLens in the AWS Management Console

CloudWatch ServiceLens

CloudWatch also announced a preview of a capability called Synthetics. CloudWatch Synthetics allows you to test your application endpoints and URLs using configurable scripts that mimic what a real customer would do. This enables the outside-in view of your customers’ experiences, and your service’s availability from their point of view.

CloudWatch introduced Embedded Metric Format, which helps you ingest complex high-cardinality application data as logs and easily generate actionable metrics. You can publish these metrics from your Lambda function by using the PutLogEvents API or using an open source library for Node.js or Python applications.

Finally, CloudWatch announced a preview of Contributor Insights, a capability to identify who or what is impacting your system or application performance by identifying outliers or patterns in log data.

AWS X-Ray

AWS X-Ray announced trace maps, which enable you to map the end-to-end path of a single request. Identifiers show issues and how they affect other services in the request’s path. These can help you to identify and isolate service points that are causing degradation or failures.

X-Ray also announced support for Amazon CloudWatch Synthetics, currently in preview. CloudWatch Synthetics on X-Ray support tracing canary scripts throughout the application, providing metrics on performance or application issues.

Screen capture of AWS X-Ray Service map in the AWS Management Console

X-Ray Service map with CloudWatch Synthetics

Amazon DynamoDB

Amazon DynamoDB announced support for customer-managed customer master keys (CMKs) to encrypt data in DynamoDB. This allows customers to bring your own key (BYOK) giving you full control over how you encrypt and manage the security of your DynamoDB data.

It is now possible to add global replicas to existing DynamoDB tables to provide enhanced availability across the globe.

Another new DynamoDB capability to identify frequently accessed keys and database traffic trends is currently in preview. With this, you can now more easily identify “hot keys” and understand usage of your DynamoDB tables.

Screen capture of Amazon CloudWatch Contributor Insights for DynamoDB in the AWS Management Console

CloudWatch Contributor Insights for DynamoDB

DynamoDB also released adaptive capacity. Adaptive capacity helps you handle imbalanced workloads by automatically isolating frequently accessed items and shifting data across partitions to rebalance them. This helps reduce cost by enabling you to provision throughput for a more balanced workload instead of over provisioning for uneven data access patterns.

Amazon RDS

Amazon Relational Database Services (RDS) announced a preview of Amazon RDS Proxy to help developers manage RDS connection strings for serverless applications.

Illustration of Amazon RDS Proxy

The RDS Proxy maintains a pool of established connections to your RDS database instances. This pool enables you to support a large number of application connections so your application can scale without compromising performance. It also increases security by enabling IAM authentication for database access and enabling you to centrally manage database credentials using AWS Secrets Manager.

AWS Serverless Application Repository

The AWS Serverless Application Repository (SAR) now offers Verified Author badges. These badges enable consumers to quickly and reliably know who you are. The badge appears next to your name in the SAR and links to your GitHub profile.

Screen capture of SAR Verifiedl developer badge in the AWS Management Console

SAR Verified developer badges

AWS Developer Tools

AWS CodeCommit launched the ability for you to enforce rule workflows for pull requests, making it easier to ensure that code has pass through specific rule requirements. You can now create an approval rule specifically for a pull request, or create approval rule templates to be applied to all future pull requests in a repository.

AWS CodeBuild added beta support for test reporting. With test reporting, you can now view the detailed results, trends, and history for tests executed on CodeBuild for any framework that supports the JUnit XML or Cucumber JSON test format.

Screen capture of AWS CodeBuild

CodeBuild test trends in the AWS Management Console

Amazon CodeGuru

AWS announced a preview of Amazon CodeGuru at re:Invent 2019. CodeGuru is a machine learning based service that makes code reviews more effective and aids developers in writing code that is more secure, performant, and consistent.

AWS Amplify and AWS AppSync

AWS Amplify added iOS and Android as supported platforms. Now developers can build iOS and Android applications using the Amplify Framework with the same category-based programming model that they use for JavaScript apps.

Screen capture of 'amplify init' for an iOS application in a terminal window

The Amplify team has also improved offline data access and synchronization by announcing Amplify DataStore. Developers can now create applications that allow users to continue to access and modify data, without an internet connection. Upon connection, the data synchronizes transparently with the cloud.

For a summary of Amplify and AppSync announcements before re:Invent, read: “A round up of the recent pre-re:Invent 2019 AWS Amplify Launches”.

Illustration of AWS AppSync integrations with other AWS services

Q4 serverless content

Blog posts

October

November

December

Tech talks

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page.

Here are the ones from Q4:

Twitch

October

There are also a number of other helpful video series covering Serverless available on the AWS Twitch Channel.

AWS Serverless Heroes

We are excited to welcome some new AWS Serverless Heroes to help grow the serverless community. We look forward to some amazing content to help you with your serverless journey.

AWS Serverless Application Repository (SAR) Apps

In this edition of ICYMI, we are introducing a section devoted to SAR apps written by the AWS Serverless Developer Advocacy team. You can run these applications and review their source code to learn more about serverless and to see examples of suggested practices.

Still looking for more?

The Serverless landing page has much more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. We’re also kicking off a fresh series of Tech Talks in 2020 with new content providing greater detail on everything new coming out of AWS for serverless application developers.

Throughout 2020, the AWS Serverless Developer Advocates are crossing the globe to tell you more about serverless, and to hear more about what you need. Follow this blog to keep up on new launches and announcements, best practices, and examples of serverless applications in action.

You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

Chris Munns: @chrismunns
Eric Johnson: @edjgeek
James Beswick: @jbesw
Moheeb Zara: @virgilvox
Ben Smith: @benjamin_l_s
Rob Sutter: @rts_rob
Julian Wood: @julian_wood

Happy coding!

Collect and distribute high-resolution crypto market data with ECS, S3, Athena, Lambda, and AWS Data Exchange

Post Syndicated from Jared Katz original https://aws.amazon.com/blogs/big-data/collect-and-distribute-high-resolution-crypto-market-data-with-ecs-s3-athena-lambda-and-aws-data-exchange/

This is a guest post by Floating Point Group. In their own words, “Floating Point Group is on a mission to bring institutional-grade trading services to the world of cryptocurrency.”

The need and demand for financial infrastructure designed specifically for trading digital assets may not be obvious. There’s a rather pervasive narrative that these coins and tokens are effectively natively digital counterparts to traditional assets such as currencies, commodities, equities, and fixed income. This narrative often manifests in the form of pithy one-liners recycled by pundits attempting to communicate the value proposition of various projects in the space (such as, “Bitcoin is just a currency with an algorithmically controlled, tamper-proof monetary policy,” or, “Ether is just a commodity like gasoline that you can use to pay for computational work on a global computer.”). Unsurprisingly, we at FPG often hear the question, “What’s so special about cryptocurrencies that they warrant dedicated financial services? Why do we need solutions for problems that have already been solved?”

The truth is that these assets and the widespread public interest surrounding them are entirely unprecedented. The decentralized ledger technology that serves as an immutable record of network transactions, the clever use of proof-of-work algorithms to economically incentivize rational actors to help uphold the security of the network (the proof-of-work concept dates back at least as far as 1993, but it was not until bitcoin that the technology showed potential for widespread adoption), the irreversible nature of transactions that poses unique legal challenges in cases such as human error or extortion, the precariousness of self-custody (third-party custody solutions don’t exactly have track records that inspire trust), the regulatory uncertainties that come with the difficulty of both classifying these assets as well as arbitrating their exchange which must ultimately be reconciled by entities like the IRS, SEC, and CFTC—it is all very new, and very weird. With 24-hour market volume regularly exceeding $100 billion, we decided to direct our focus towards problems related specifically to trading these assets. Granted, crypto trading has undoubtedly matured since the days of bartering for bitcoin in web forums and witnessing 10% price spreads between international exchanges. But there is still a long path ahead.

One major pain point we are aiming to address for institutional traders involves liquidity (or, more precisely, the lack thereof). Simply put, the buying and selling of cryptocurrencies occurs across many different trading venues (exchanges), and liquidity (the offers to buy or sell a certain quantity of an asset at a certain price) continues to become more fragmented as new exchanges emerge. So say you’re trying to buy 100 bitcoins. You must buy from people who are willing to sell. As you take the best (cheapest) offers, you’re left with increasingly expensive offers. By the time you fill your order (in this example, buy all 100 bitcoins), you may have paid a much higher average price than, say, the price you paid for the first bitcoin of your order. This phenomenon is referred to as slippage. One easy way to minimize slippage is by expanding your search for offers. So rather than looking at the offers on just one exchange, look at the offers across hundreds of exchanges. This process, traditionally referred to as smart order routing (SOR), is one of the core services we provide. Our SOR service allows traders to easily submit orders that our system can match against the best offers available across multiple trading venues by actively monitoring liquidity across dozens of exchanges.

Fanning out large orders in search of the best prices is a rather intuitive and widely applicable concept—roughly 75% of equities are purchased and sold via SOR. But the value of such a service for crypto markets is particularly salient: a perpetual cycle of new exchanges surging in popularity while incumbents falter has resulted in a seemingly incessant fragmentation of liquidity across trading venues—yet traders tend to assume an exchange-agnostic mindset, concerned exclusively with finding the best price for a given quantity of an asset.

Access to both real-time and historical market data is essential to the functionality of our SOR service. The highest resolution data we could hope to obtain for a given market would include every trade and every change applied to the order book, effectively allowing us to recreate the state of a market at any given point in time. The updates provided through the WebSocket streams are not sufficient for reconstructing order books. We also need to periodically fetch snapshots of the order books and store those, which we can do using an exchange’s REST API. We can fetch a snapshot and apply the corresponding updates from the streams to “replay” the order book.

Fortunately, this data is freely available, because many exchanges offer real-time feeds of market data via WebSocket APIs. We found several third-party vendors selling subscriptions to these data sets, typically in the form of CSV dumps delivered at a weekly or monthly cadence. This presented the question of build vs. buy. Given that we felt capable of building a robust and reliable system for ingesting real-time market data in a relatively short amount of time and at a fraction of the cost of purchasing the data from a vendor, we were already leaning in favor of building. Further investigation made buying look like an increasingly unattractive option. Disclaimers that multiple vendors issued about their inability to guarantee data quality and consistency did not inspire confidence. Inspecting sample data sets revealed that some essential fields provided in the original data streams were missing—fields necessary for achieving our goal of recreating the state of a market at an arbitrary point in time. We also recognized that a weekly or monthly delivery schedule would restrict our ability to explore relatively recent market data.

This post provides a high-level overview of how we ingest and store real-time market data and how we use the AWS Data Exchange API to organize and publish our data sets programmatically. Our system’s functionality extends well beyond data ingestion, normalization, and persistence; we run dedicated services for data validation, caching the most recent trade and order book for every market, computing and storing derivative metrics, and other services that help safeguard data accuracy and minimize the latency of our trading systems.

Data ingestion

The WebSocket streams we connect to for data consumption are often the same APIs responsible for providing real-time updates to an exchange’s trading dashboard.

WebSocket connections transmit data as discrete messages. We can inspect the content of individual messages as they stream into the browser. For example, the following screenshot shows a batch of order book updates.

The updates are expressed as arrays of bids and asks that were either added to the book or removed from it. Client-side code processes each update, resulting in a real-time rendering of the market’s order book. In practice, our data ingestion service (Ingester) does not read a single stream, but rather thousands of different streams, covering various data feeds for all markets across multiple exchanges. All the connections required for such broad coverage and the resulting flood of incoming data raise some obvious concerns about data loss. We’ve taken several measures to mitigate such concerns, including a redundant system design that allows us to spin up an arbitrary number of instances of the Ingester service. Like most of our microservices, Ingester is a Dockerized service run on Amazon ECS and deployed via Terraform.

All these instances consume the same data feeds as each other while a downstream mechanism handles deduplication (this is covered in more detail later in this post). We also set up Amazon CloudWatch alerts to notify us when we detect non-contiguous messages, indicating a gap in the incoming data. The alerts don’t directly mitigate data loss, but they do serve the important function of prompting an investigation.

Ingester builds up separate buffers of incoming messages, split out by data-type/exchange/market. Then, after a fixed time interval, each buffer is flushed into Amazon S3 as a gzipped JSON file. The buffer-flush cycle repeats.

The following screenshot shows a portion of the file content.

This code snippet is a single, pretty-printed JSON record from the file in the screenshot above.

{
   "event_type":"trade",
   "timestamp":1571980320422,
   "ticker_pair":"BTCUSDT",
   "trade_id":194230159,
   "price":"7405.69000000",
   "quantity":"3.20285300",
   "buyer_order_id":730178987,
   "seller_order_id":730178953,
   "trade_timestamp":1571980320417,
   "buyer_market_maker":false,
   "M":true
}

Ingester handles additional functionality, such as applying pre-defined mappings of venue-specific field names to our internal field names. Data normalization is one of many processes necessary to enable our systems to build a holistic understanding of market dynamics.

As with most distributed system designs, our services are written with horizontal scalability as a first-order priority. We took the same approach in designing our data ingestion service, but it has some features that make it a bit different than the archetypical horizontally scalable microservice. The most common motivations for adjusting the number of instances of a given service are load-balancing and throttling throughput. Either your system is experiencing backpressure and a consumer service scales to alleviate that pressure, or the consumer is over-provisioned and you scale down the number of instances for the sake of parsimony. For our data ingestion service, however, our motivation for running multiple instances is to minimize data loss via redundancy. The CPU usage for each instance is independent of instance count, because each instance does identical work.

For example, rather than helping alleviate backpressure by pulling messages from a single queue, each instance of our data ingestion service connects to the same WebSocket streams and performs the same amount of work. Another somewhat unusual and confounding aspect of horizontally scaling our data ingestion service is related to state: we batch records in memory and flush the records to S3 every minute (based on the incoming message’s timestamp, not the system timestamp, because those would be inconsistent). Redundancy is our primary measure for minimizing data loss, but we also need each instance to write the files to S3 in such a way that we don’t end up with duplicate records. Our first thought was that we’d need a mechanism for coordinating activity across the instances, such as maintaining a cache that would allow us to check if a record had already been persisted. But we realized that we could perform this deduplication without any coordination between instances at all. Most of the message streams we consume publish messages with sequence IDs. We can combine the sequence IDs with the incoming message timestamp to achieve our deduplication mechanism: we can deterministically generate the same exact file names containing the exact same data by writing our service code to check that the message added to the batch has the appropriate sequence ID relative to the previous message in the batch and using the timestamp on the incoming message to determine the exact start and end of each batch (we typically get a UNIX timestamp and check when we’ve rolled over to the next clock minute). This allows us to simply rely on a key collision in S3 for deduplication.

AWS suggests a similar solution for a slightly different problem, relating to Amazon Kinesis Data Streams. For more information, see Handling Duplicate Records.

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

After we store the data, we can perform simple analytics queries on the billions of records we’ve stored in S3 using Amazon Athena, a query service that requires minimal configuration and zero infrastructure overhead. Athena has a concept of partitions (inherited from one of its underlying services, Apache Hive). Partitions are mappings between virtual columns (in our case: pair, year, month, and day) and the S3 directories in which the corresponding data is stored.

S3’s file system is not actually hierarchical. Files are prepended with long key prefixes that are rendered as directories in the AWS console when browsing a bucket’s contents. This has some non-trivial performance consequences when querying or filtering on large data sets.

The following screenshot illustrates a typical directory path.

By pointing Athena directly to a particular subset of data, a well-defined partitioning scheme can drastically reduce query run times and costs. Though the ability the perform ad hoc business analytics queries is primarily a convenience, taking time to choose a sane multi-level partitioning scheme for Athena based on some of our most common access patterns seemed worthwhile. A poorly designed partition structure can result in Athena unnecessarily scanning huge swaths of data and ultimately render the service unusable.

Data publication

Our pipeline for transforming thousands of small gzipped JSON files into clean CSVs and loading them into AWS Data Exchange involves three distinct jobs, each expressed as an AWS Lambda function.

Job 1

Job 1 is initiated shortly after midnight UTC by a cron-scheduled CloudWatch event. As mentioned previously, our data ingestion service’s batching mechanism flushes each batch to S3 at a regular time interval. A timestamp on the incoming message (applied server-side) determines the rollover from one interval to the next, as opposed to the ingestion service’s system timestamp, so in the rare case that a non-trivial amount of time elapses between the consumption of the final message of batch n and the first message of batch n+1, we kick off the first Lambda function 20 minutes after midnight UTC to minimize the likelihood of omitting data pending write.

Job 1 formats values for the date and data source into an Athena query template and outputs the query results as a CSV to a specified prefix path in S3. (Every Athena query produces a .metadata file and a CSV file of the query results, though DDL statements do not output a CSV.) This PUT request to S3 triggers an S3 event notification.

We run a full replica data ingestion system as an additional layer of redundancy. Using the coalesce conditional expression, the Athena query in Job 1 merges data from our primary system with the corresponding data from our replica system, and fills in any gaps while deduplicating redundant records.

We experimented fairly extensively with AWS Glue and PySpark for the ETL-related work performed in Job 1. When we realized that we could merge all the small source files into one, join the primary and replica data sets, and sort the results with a single Athena query, we decided to stick with this seemingly simpler and more elegant approach.

The following code shows one of our Athena query templates.

Job 2

Job 2 is triggered by the S3 event notification from Job 1. Job 2 simply copies the query results CSV file to a different key within the same S3 bucket.

The motivation for this step is twofold. First, we cannot dictate the name of an Athena query results CSV file; it is automatically set to the Athena query ID. Second, when adding an S3 object as an asset to an AWS Data Exchange revision, the asset’s name is automatically set to the S3 object’s key. So to dictate how the CSV file name appears in AWS Data Exchange, we must first rename it, which we accomplish by copying it to a specified S3 key.

Job 3

Job 3 handles all work related to AWS Data Exchange and AWS Marketplace Catalog via their respective APIs. We use boto3, AWS’s Python SDK, to interface with these APIs. The AWS Marketplace Catalog API is necessary for adding data set revisions to products that have already been published. For more information, see Tutorial: Adding New Data Set Revisions to a Published Data Product.

Our code explicitly defines mappings with the following structure:

data source / DataSet / Product

The following code shows how we configure relationships between data sources, data sets, and products.

Our data sources are typically represented by a trading venue and data type combination (such as Binance trades or CoinbasePro order books). Each new file for a given data source is delivered as a single asset within a single new revision for a particular data set.

An S3 trigger kicks off the Lambda function. The trigger is scoped to a specified prefix that maps to a single data set. The function alias feature of AWS Lambda allows us to define the unique S3 triggers for each data set while reusing the same underlying Lambda function. Job 3 carries out the following steps (note that steps 1 through 5 refer to the AWS Data Exchange API while steps 6 and 7 refer to the AWS Marketplace Catalog API):

  1. Submits a request to create a new revision for the corresponding data set via CreateRevision.
  2. Adds the file that was responsible for triggering the Lambda function to the newly created revision via CreateJob using the IMPORT_ASSETS_FROM_S3 job type. To submit this job, we need to supply a few values: the S3 bucket and key values for the file are pulled from the Lambda event message, while the RevisionID argument comes from the response to the CreateRevision call in the previous step.
  3. Kicks off the job with StartJob, sourcing the JobID argument from the response to the CreateJob call in the previous step.
  4. Polls the job’s status via GetJob (using the job ID from the response to the StartJob call in the previous step) to check that our file (the asset) was successfully added to the revision.
  5. Finalizes the revision via UpdateRevision.
  6. Requests a description of the marketplace entity using DescribeEntity, passing in the product ID stored in our hardcoded mappings as the EntityID
  7. Kicks off the entity ChangeSet via StartChangeSet, passing in the entity ID from the previous step, the entity ID from the DescribeEntity response in the previous step as EntityID, the revision ARN parsed from the response to our earlier call to CreateRevision as RevisionArn, and the data set ARN as DataSetArn, which we fetch at the start of the code’s runtime using AWS Data Exchange API’s GetDataSet.

Here’s a thin wrapper class we wrote to carry out the steps detailed above:

from time import sleep
import logging
import json

import boto3

from config import (
    DATA_EXCHANGE_REGION,
    MARKETPLACE_CATALOG_REGION,
    LambdaS3TriggerMappings
)

logger = logging.getLogger()


class CustomDataExchangeClient:
    def __init__(self):
        self._de_client = boto3.client('dataexchange', region_name=DATA_EXCHANGE_REGION)
        self._mc_client = boto3.client('marketplace-catalog', region_name=MARKETPLACE_CATALOG_REGION)
    
    def _get_s3_data_source(self, bucket, prefix):
        return LambdaS3TriggerMappings[(bucket, prefix)]

    # Job State can be one of: WAITING | IN_PROGRESS | ERROR | COMPLETED | CANCELLED | TIMED_OUT
    def _wait_for_de_job_completion(self, job_id):
        while True:
            get_job_resp = self._de_client.get_job(JobId=job_id)
            if get_job_resp['State'] == 'COMPLETED':
                logger.info(f"Job '{job_id}' succeeded:\n\t{get_job_resp}")
                break
            elif get_job_resp['State'] in ('ERROR', 'CANCELLED'):
                raise Exception(f"Job '{job_id}' failed:\n\t{get_job_resp}")
            else:
                sleep(5)
                logger.info(f"Still waiting on job {job_id}...")
        return get_job_resp

    # ChangeSet Status can be one of: PREPARING | APPLYING | SUCCEEDED | CANCELLED | FAILED
    def _wait_for_mc_change_set_completion(self, change_set_id):
        while True:
            describe_change_set_resp = self._mc_client.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=change_set_id
                )
            if describe_change_set_resp['Status'] == 'SUCCEEDED':
                logger.info(
                    f"ChangeSet '{change_set_id}' succeeded:\n\t{describe_change_set_resp}"
                )
                break
            elif describe_change_set_resp['Status'] in ('FAILED', 'CANCELLED'):
                raise Exception(
                    f"ChangeSet '{change_set_id}' failed:\n\t{describe_change_set_resp}"
                )
            else:
                sleep(1)
                logger.info(f"Still waiting on ChangeSet {change_set_id}...")
        return describe_change_set_resp

    def process_s3_event(self, s3_event):
        source_bucket = s3_event['Records'][0]['s3']['bucket']['name']
        source_key = s3_event['Records'][0]['s3']['object']['key']
        source_prefix = '/'.join(source_key.split('/')[0:-1])
        s3_data_source = self._get_s3_data_source(source_bucket, source_prefix)
        obj_name = source_key.split('/')[-1]
        
        s3_data_source.validate_object_name(obj_name)
        
        for data_set in s3_data_source.lambda_s3_trigger_target_data_sets:
            # Create revision
            create_revision_resp = self._de_client.create_revision(
                DataSetId=data_set.id,
                Comment=obj_name
            )
            logger.debug(create_revision_resp)
            revision_id = create_revision_resp['Id']
            revision_arn = create_revision_resp['Arn']

            # Create job
            create_job_resp = self._de_client.create_job(
                Type='IMPORT_ASSETS_FROM_S3',
                Details={
                    'ImportAssetsFromS3': {
                      'AssetSources': [
                          {
                              'Bucket': source_bucket,
                              'Key': source_key
                          },
                      ],
                      'DataSetId': data_set.id,
                      'RevisionId': revision_id
                    }
                }
            )
            logger.debug(create_job_resp)

            # Start job
            job_id = create_job_resp['Id']
            start_job_resp = self._de_client.start_job(JobId=job_id)
            logger.debug(start_job_resp)

            # Wait for Data Exchange job completion
            get_job_resp = self._wait_for_de_job_completion(job_id)
            logger.debug(get_job_resp)

            # Finalize revision
            update_revision_resp = self._de_client.update_revision(
                DataSetId=data_set.id,
                RevisionId=revision_id,
                Finalized=True
            )
            logger.debug(update_revision_resp)

            # Ensure revision finalization succeeded
            finalized_status = update_revision_resp['Finalized']
            if finalized_status is not True:
                raise Exception(f"Failed to finalize revision:\n{update_revision_resp}")

            # Publish the new revision to each product associated with the data set
            for product in data_set.products:
                # Describe the AWS Marketplace entity corresponding to the Data Exchange product
                describe_entity_resp = self._mc_client.describe_entity(
                    Catalog='AWSMarketplace',
                    EntityId=product.id
                )
                logger.debug(describe_entity_resp)

                entity_type = describe_entity_resp['EntityType']
                entity_id = describe_entity_resp['EntityIdentifier']

                # Isolate the target data set in the DescribeEntity response
                describe_entity_resp_data_sets = json.loads(describe_entity_resp['Details'])['DataSets']
                describe_entity_resp_data_set = list(
                    filter(lambda ds: ds['DataSetArn'] == data_set.arn, describe_entity_resp_data_sets)
                )
                # We should get the data set of interest in describe_entity_resp and only that data set
                assert len(describe_entity_resp_data_set) == 1

                # Start a ChangeSet to add the newly finalized revision to an existing product
                start_change_set_resp = self._mc_client.start_change_set(
                    Catalog='AWSMarketplace',
                    ChangeSet=[
                        {
                            "ChangeType": "AddRevisions",
                            "Entity": {
                                "Identifier": entity_id,
                                "Type": entity_type
                            },
                            "Details": json.dumps({
                                "DataSetArn": data_set.arn,
                                "RevisionArns": [revision_arn]
                            })
                        }
                    ]
                )
                logger.debug(start_change_set_resp)

                # Wait for the ChangeSet workflow to complete
                change_set_id = start_change_set_resp['ChangeSetId']
                describe_change_set_resp = self._wait_for_mc_change_set_completion(change_set_id)
                logger.debug(describe_change_set_resp)

The following screenshot shows the S3 trigger for Job 3.

The following screenshot shows an example of CloudWatch logs for Job 3.

The following screenshot shows a CloudWatch alarm for Job 3.

Finally, we can verify that our revisions were successfully added to their corresponding data sets and products through the AWS console.

AWS Data Exchange allows you to create private offers for your AWS account IDs, providing a convenient means of checking that revisions show up in each product as expected.

Conclusion

This post demonstrated how you can integrate AWS Data Exchange into an existing data pipeline frictionlessly. We’re pleased to have been invited to participate in the AWS Data Exchange private preview, and even more pleased with the service itself, which has proven to be a sophisticated yet natural extension of our system.

I want to offer special thanks to both Kyle Patsen and Rafic Melhem of the AWS Data Exchange team for generously fielding my questions (and patiently enduring my ramblings) for the better part of the past year. I also want to thank Lucas Adams for helping me design the system discussed in this post and, more importantly, for his unwavering vote of confidence.

If you are interested in learning more about FPG, don’t hesitate to contact us.

 

Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/extract-transform-and-load-data-into-s3-data-lake-using-ctas-and-insert-into-statements-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze the data stored in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. You can reduce your per-query costs and get better performance by compressing, partitioning, and converting your data into columnar formats. To learn more about best practices to boost query performance and reduce costs, see Top 10 Performance Tuning Tips for Amazon Athena.

Overview

This blog post discusses how to use Athena for extract, transform and load (ETL) jobs for data processing. This example optimizes the dataset for analytics by partitioning it and converting it to a columnar data format using Create Table as Select (CTAS) and INSERT INTO statements.

CTAS statements create new tables using standard SELECT queries to filter data as required. You can also partition the data, specify compression, and convert the data into columnar formats like Apache Parquet and Apache ORC using CTAS statements. As part of the execution, the resultant tables and partitions are added to the AWS Glue Data Catalog, making them immediately available for subsequent queries.

INSERT INTO statements insert new rows into a destination table based on a SELECT query statement that runs on a source table. If the source table’s underlying data is in CSV format and destination table’s data is in Parquet format, then INSERT INTO can easily transform and load data into destination table’s format. CTAS and INSERT INTO statements can be used together to perform an initial batch conversion of data as well as incremental updates to the existing table.

Here is an overview of the ETL steps to be followed in Athena for data conversion:

  1. Create a table on the original dataset.
  2. Use a CTAS statement to create a new table in which the format, compression, partition fields and location of the new table can be specified.
  3. Add more data into the table using an INSERT INTO statement.

This example uses a subset of NOAA Global Historical Climatology Network Daily (GHCN-D), a publicly available dataset on Amazon S3, in this example.

This subset of data is available at the following S3 location:

s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/
Total objects: 41727 
Size of CSV dataset: 11.3 GB
Region: us-east-1

Procedure

Follow these steps to use Athena for an ETL job.

Create a table based on original dataset

The original data is in CSV format with no partitions in Amazon S3. The following files are stored at the Amazon S3 location:

2019-10-31 13:06:57  413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000
2019-10-31 13:06:57  412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001
2019-10-31 13:06:57   34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002
2019-10-31 13:06:57  412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100
2019-10-31 13:06:57  412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101

Note that the file sizes are pretty small. Merging them into larger files and reducing total number of files would lead to faster query execution. CTAS and INSERT INTO can help achieve this.

To execute queries in the Athena console (preferably in us-east-1 to avoid inter-region Amazon S3 data transfer charges). First, create a database for this demo:

CREATE DATABASE blogdb

Now, create a table from the data above.

CREATE EXTERNAL TABLE `blogdb`.`original_csv` (
  `id` string, 
  `date` string, 
  `element` string, 
  `datavalue` bigint, 
  `mflag` string, 
  `qflag` string, 
  `sflag` string, 
  `obstime` bigint)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'

Use CTAS to partition data and convert into parquet format with snappy compression

Now, convert the data to Parquet format with Snappy compression and partition the data on a yearly basis. All these actions are performed using the CTAS statement. For the purpose of this blog, the initial table only includes data from 2015 to 2019. You can add new data to this table using the INSERT INTO command.

The table created in Step 1 has a date field with the date formatted as YYYYMMDD (e.g. 20100104). The new table is partitioned on year. Extract the year value from the date field using the Presto function substr(“date”,1,4).

CREATE table new_parquet
WITH (format='PARQUET', 
parquet_compression='SNAPPY', 
partitioned_by=array['year'], 
external_location = 's3://your-bucket/optimized-data/') 
AS
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) >= 2015
        AND cast(substr("date",1,4) AS bigint) <= 2019

Once the query is successful, check the Amazon S3 location specified in the CTAS statement above. You should be able to see partitions and parquet files in each of these partitions, as shown in the following examples:

  1. Partitions:
    $ aws s3 ls s3://your-bucket/optimized-data/
                               PRE year=2015/
                               PRE year=2016/
                               PRE year=2017/
                               PRE year=2018/
                               PRE year=2019/

  2. Parquet files:
    $ aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable | head -5
    
    2019-10-31 14:51:05    7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d
    2019-10-31 14:51:05    7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a
    2019-10-31 14:51:05    9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799
    2019-10-31 14:51:05    7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d
    2019-10-31 14:51:05    6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1

Add more data into table using INSERT INTO statement

Now, add more data and partitions into the new table created above. The original dataset has data from 2010 to 2019. Since you added 2015 to 2019 using CTAS, add the rest of the data now using an INSERT INTO statement:

INSERT INTO new_parquet
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) < 2015

List the Amazon S3 location of the new table:

 $ aws s3 ls s3://your-bucket/optimized-data/
                           PRE year=2010/
                           PRE year=2011/
                           PRE year=2012/
                           PRE year=2013/
                           PRE year=2014/
                           PRE year=2015/
                           PRE year=2016/
                           PRE year=2017/
                           PRE year=2018/
                           PRE year=2019/ 

You can see that INSERT INTO is able to determine that “year” is a partition column and writes the data to Amazon S3 accordingly. There is also a significant reduction in the total size of the dataset thanks to compression and columnar storage in the Parquet format:

Size of dataset after parquet with snappy compression - 1.2 GB

You can also run INSERT INTO statements if more CSV data is added to original table. Assume you have new data for the year 2020 added to the original Amazon S3 dataset. In that case, you can run the following INSERT INTO statement to add this data and the relevant partition(s) to the new_parquet table:

INSERT INTO new_parquet
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) = 2020

Query the results

Now that you have transformed data, run some queries to see what you gained in terms of performance and cost optimization:

First, find the number of distinct IDs for every value of the year:

    1. Query on the original table:
      SELECT substr("date",1,4) as year, 
             COUNT(DISTINCT id) 
      FROM original_csv 
      GROUP BY 1 ORDER BY 1 DESC

    2. Query on the new table:
      SELECT year, 
        COUNT(DISTINCT id) 
      FROM new_parquet 
      GROUP BY  1 ORDER BY 1 DESC

      Original table

      New table

      Savings

      Run timeData scannedCost

      Run

      Time

      Data

      Scanned

      Cost
      16.88 seconds11.35 GB$0.05673.79 seconds428.05 MB$0.00214577.5% faster and 96.2% cheaper

       

Next, calculate the average maximum temperature (Celsius), average minimum temperature (Celsius), and average rainfall (mm) for the Earth in 2018:

      1. Query on the original table:
        SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value
        FROM original_csv
        WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018'
        GROUP BY  1

      1. Query on the new table:
        SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value
        FROM new_parquet 
        WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018'
        GROUP BY  1

        Original tableNew tableSavings
        Run timeData scannedCost

        Run

        Time

        Data

        Scanned

        Cost
        18.65 seconds11.35 GB$0.05671.92 seconds68.08 MB$0.00034590% faster and 99.4% cheaper

         

Conclusion

This post showed you how to perform ETL operations using CTAS and INSERT INTO statements in Athena. You can perform the first set of transformations using a CTAS statement. When new data arrives, use an INSERT INTO statement to transform and load data to the table created by the CTAS statement. Using this approach, you converted data to the Parquet format with Snappy compression, converted a non-partitioned dataset to a partitioned dataset, reduced the overall size of the dataset and lowered the costs of running queries in Athena.

 


About the Author

 Pathik Shah is a big data architect for Amazon EMR at AWS.

 

 

 

 

 

Connect Amazon Athena to your Apache Hive Metastore and use user-defined functions

Post Syndicated from Janak Agarwal original https://aws.amazon.com/blogs/big-data/connect-amazon-athena-to-your-apache-hive-metastore-and-use-user-defined-functions/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. This post details the two new preview features that you can start using today: connecting to Apache Hive Metastore and using user-defined functions.

Connecting Athena to your Apache Hive Metastore

Several customers use the Hive Metastore as a common metadata catalog for their big data environments. Such customers run Apache Spark, Presto, and Apache Hive on Amazon EC2 and Amazon EMR clusters with a self-hosted Hive Metastore as the common catalog. AWS also offers the AWS Glue Data Catalog, which is a fully managed catalog and drop-in replacement for the Hive Metastore. With the release as of this writing, you can now use the Hive Metastore in addition to the Data Catalog with Athena. Athena now allows you to connect to multiple Hive Metastores along with existing Data Catalog.

To connect to a self-hosted Hive Metastore, you need a metastore connector. You can download a reference implementation of this connector, which runs as a Lambda function in your account. The current version of the implementation supports only SELECT queries. DDL support is limited to basic metadata syntax. For more information, see please check Considerations and Limitations of this feature. You can also write a Hive Metastore connector using the previous reference implementation as an example. You can deploy your implementation as a Lambda function, and subsequently use it with Athena. For more information about the feature, see the Using Athena Data Connector for External Hive Metastore (Preview) documentation.

Using user-defined functions in Athena

Athena also offers preview support for scalar user-defined functions (UDFs). UDFs enable you to write functions and invoke them in SQL queries. A scalar UDF is applied one row at a time and returns a single column value. Athena invokes your scalar UDF with batches of rows to limit the performance impact associated with making a remote call for the UDF itself.

With the latest release as of this writing, you can use the Athena Query Federation SDK to define your functions and invoke them inline in SQL queries. You can now compress and decompress row values, scrub personally identifiable information (PII) from columns, transform dates to a different format, read image metadata, and execute proprietary custom code in your queries. You can also execute UDFs in both the SELECT and FILTER phase of the query and invoke multiple UDFs in the same query.

For more information about UDFs, see our documentation. For common UDF example implementations, see the GitHub repo. For more information about writing functions using the Athena Query Federation SDK, please visit this link.

Testing the preview features

All Athena queries originating from the workgroup AmazonAthenaPreviewFunctionality are considered Preview test queries.

Create a new workgroup AmazonAthenaPreviewFunctionality using Athena APIs or the Athena console. For more information, see Create a Workgroup.

The following considerations are important when using preview features.

Do not edit the workgroup name. You can edit other workgroup properties, such as enabling Amazon CloudWatch metrics and requester pays. You can use the Athena console, JDBC/ODBC drivers, or APIs to submit your test queries. Specify the workgroup AmazonAthenaPreviewFunctionality when you submit test queries.

Preview functionality is available only in the us-east-1 Region. If you use Athena in any other Region and submit queries using AmazonAthenaPreviewFunctionality, your query fails. Cross-Region calls are not supported in preview mode.

During the preview, you do not incur charges for the data scanned from federated data sources. However, you are charged standard Athena rates for data scanned from S3. Additionally, you are charged standard rates for the AWS services that you use with Athena, such as S3, AWS Lambda, AWS Glue, Amazon SageMaker, and AWS SAM. For example, you are charged S3 rates for storage, requests, and inter-Region data transfer. By default, query results are stored in an S3 bucket of your choice and are billed at standard S3 rates. If you use Lambda, you are charged based on the number of requests for your functions and the duration (the time it takes for your code to execute).

It is not recommended to onboard your production workload to AmazonAthenaPreviewFunctionality.

Query performance may vary between the preview workgroup and the other workgroups in your account. Additionally, new features and bug fixes may not be backwards compatible.

Summary

In summary, we introduced Athena’s two new features that released today in Preview.

Customers who use the Apache Hive Metastore for metadata management, and were previously unable to use Athena, can now connect their Hive Metastore to Athena to run queries. Also, customers can now use Athena’s Query Federation SDK to define and invoke their own functions in their SQL queries in Athena.

Both these features are available in Preview in the AWS us-east-1 region. Begin your Preview now by following these steps in the Athena FAQ. We welcome your feedback at [email protected]

 


About the Author

Janak Agarwal is a product manager for Athena at AWS.

 

 

 

Prepare data for model-training and invoke machine learning models with Amazon Athena

Post Syndicated from Janak Agarwal original https://aws.amazon.com/blogs/big-data/prepare-data-for-model-training-and-invoke-machine-learning-models-with-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon Athena has announced a public preview of a new feature that provides an easy way to run inference using machine learning (ML) models deployed on Amazon SageMaker directly from SQL queries. The ability to use ML models in SQL queries makes complex tasks such as anomaly detection, customer cohort analysis, and sales predictions as simple as writing a SQL query.

Overview

Users now have the flexibility to use ML models trained on proprietary datasets or to use out-of-the-box, pre-trained ML models deployed on Amazon SageMaker. You can now easily invoke a variety of ML algorithms across text analysis, statistical tools, and any algorithm deployed on Amazon SageMaker. There is no additional setup required. Users can invoke ML models in SQL queries from the Athena console, Athena APIs, and through use of Athena’s JDBC and ODBC drivers in tools such as Amazon QuickSight. Within seconds, analysts can run inferences to forecast sales, detect suspicious logins to an application, or categorize all users into customer cohorts.

During the preview, you are not charged for the data scanned from federated data sources. However, you are charged standard Athena rates for data scanned from Amazon S3. Additionally, you are charged standard rates for the AWS services that you use with Athena, such as Amazon S3, AWS Lambda, AWS Glue, Amazon SageMaker, and AWS Serverless Application Repository. For example, you are charged S3 rates for storage, requests, and inter-region data transfer. By default, query results are stored in an S3 bucket of your choice and are also billed at standard Amazon S3 rates. If you use AWS Lambda, you are charged based on the number of requests for your functions and the duration, the time it takes for your code to execute.

Athena has also added support for federated queries and user-defined functions (UDFs). This blog demonstrates how to:

  1. Ingest, clean, and transform a dataset using Athena SQL queries to ready it for model training purposes
  2. Use Amazon SageMaker to train the model
  3. Use Athena’s ML inference capabilities to run prediction queries in SQL using the model created in Step 2

For illustration purposes, consider an imaginary video game company whose goal is to predict which games will be successful.

Assume that the video game company has a raw dataset in Amazon S3 with the schema represented by the following SQL statement.

CREATE EXTERNAL TABLE `video_game_data`.`video_games`(
  `gameid` int COMMENT '', 
  `name` string COMMENT '', 
  `platform` string COMMENT '',
  `year_of_release` int COMMENT '',
  `genre` string COMMENT '',
  `publisher` string COMMENT '',
  `sales` int COMMENT '',
  `critic_score` int COMMENT '',
  `critic_count` int COMMENT '',
  `user_score` int COMMENT '',
  `user_count` int COMMENT '',
  `developer` string COMMENT '',
  `rating` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES ( 
  'separatorChar'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://video-game-demo/'
TBLPROPERTIES (
  'has_encrypted_data'='false')

A screenshot of the sample dataset in Athena follows.

Diagram 1: Screenshot of the sample dataset in our example.

Data Analysis and Cleaning

To prepare the dataset for training the model using Amazon SageMaker, you need to select the relevant data required to train the ML model. You don’t need columns that are not relevant for training, such as game_id, name, year_of_release, sales, critic_count, user_count, and developer.

Additionally, this example defines success as the scenario in which sales of a particular game exceed 1,000,000. You can create a success column that is either 0 (denoting no success) or 1 (denoting success) in the dataset to reflect success.

A sample query and screenshot showing the success column follow:

SELECT platform,
         genre,
         critic_score,
         user_score,
         rating,
         (sales > 1000000) AS success
FROM "video_game_data"."video_games";

Diagram 2: Screenshot of our sample dataset with the columns that are irrelevant for training our ML model deleted.

ML models typically do not work well with String Enums. To optimize performance and improve model accuracy, convert the platform, genre, publisher, and rating to integer Enum constants. You can use Athena’s UDF functionality to accomplish this.

The following sample code creates an AWS Lambda function that you can invoke in the Athena SQL query as UDF.

public class AthenaUDFHandler
        extends UserDefinedFunctionHandler
{
    private static final String SOURCE_TYPE = "athena_common_udfs";
    private final ImmutableMap<String, Integer> genreMap;

    public AthenaUDFHandler()
    {
        super(SOURCE_TYPE);
        genreMap = ImmutableMap.<String, Integer>builder()
                .put("Action", 1)
                .put("Adventure", 2)
                .put("Fighting", 3)
                .put("Misc", 4)
                .put("Platform", 5)
                .put("Puzzle", 6)
                .put("Racing", 7)
                .put("Role-Playing", 8)
                .put("Shooter", 9)
                .put("Simulation", 10)
                .put("Sports", 11)
                .put("Strategy", 12).build();
    }

    public Integer normalize_genre(String genre)
    {
        //Implement your code here
        return genreMap.getOrDefault(genre, 0);    }

    public Integer normalize_rating(String rating)
    {
        //Implement your code here
        return rating.hascode();
    }

    public Integer normalize_platform(String platform)
    {
        //Implement your code here
        return platform.hashcode();
    }
}

A sample Athena Query that normalizes the dataset using the functions created above follows:

USING 
FUNCTION normalize_genre(value VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization'),

FUNCTION normalize_platform(value VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization')

FUNCTION normalize_rating(value VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization')

SELECT CAST((sales > 1000000) AS integer) AS success,
         normalize_platform('platform', platform) AS platform, 
         normalize_genre('genre', genre) AS genre, 
         critic_score, 
         user_score, 
         normalize_rating('rating', rating) AS rating
FROM video_game_data.video_games;

The following screenshot shows the normalized dataset:

Diagram 3: Screenshot after defining, and invoking our UDFs that help to normalize our dataset.

Creating the machine learning model

Next, create the ML model in Amazon SageMaker.

Open an Amazon Sagemaker Notebook instance with Permissions to Execute Athena Queries and execute the following sample Athena query. The query first imports the required Amazon SageMaker libraries and PyAthena into your Amazon SageMaker Notebook, executes an Athena query to retrieve the training dataset, invokes the training algorithm on this dataset, and deploys the resulting model on the selected Amazon SageMaker instance. PyAthena allows you to invoke Athena SQL queries from within your Amazon SageMaker Notebook.

Note that you can also train your model using a different algorithm and evaluate your model. For more details on using Amazon SageMaker, please visit the SageMaker documentation.

import sys
!{sys.executable} -m pip install PyAthena
import boto3, os
import sagemaker
import pandas as pd 

from sagemaker import get_execution_role
from pyathena import connect 

# Create Traing Dataset for inference
athena_output_bucket = 'athena-results'
region = 'us-east-1'
connection = connect(s3_staging_dir='s3://{}/'.format(athena_output_bucket, region_name=region, work_group='AmazonAthenaPreviewFunctionality') 

results = pd.read_sql("USING FUNCTION normalize_genre(value VARCHAR) RETURNS INTEGER TYPE LAMBDA_INVOKE WITH (lambda_name='VideoNormalization'), 
FUNCTION normalize_platform(value VARCHAR) RETURNS INTEGER TYPE LAMBDA_INVOKE WITH (lambda_name='VideoNormalization'), 
FUNCTION normalize_rating(value VARCHAR) RETURNS INTEGER TYPE LAMBDA_INVOKE WITH (lambda_name='VideoNormalization') 
SELECT CAST((sales > 1000000) AS integer) AS success, normalize_platform(platform) AS platform, normalize_genre(genre) AS genre, critic_score, user_score, normalize_rating(rating) AS rating FROM video_game_data.video_games", connection) 

training_data_output_location = 'video-games-sales-prediction'

results.to_csv('training_data.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(training_data_output_location).Object(os.path.join(prefix, 'train/train.csv')).upload_file('training_data.csv')
s3_input = sagemaker.s3_input(s3_data='s3://{}/train'.format(training_data_output_location), content_type='csv')

#Model Training
role = get_execution_role()
container = '811284229777.dkr.ecr.us-east-1-1.amazonaws.com/xgboost:latest'

sess = sagemaker.Session()
xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/model'.format(training_data_output_location),
                                    sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        num_round=100)

xgb.fit({'train': s3_input})

# Model Deployment
xgb_predictor = xgb.deploy(initial_instance_count=1,instance_type='ml.m4.xlarge')
xgb_predictor.endpoint_name

Using the model in SQL queries

Having trained the ML model and deployed it on Amazon SageMaker, your next task is to use this model in your Athena SQL queries to predict whether a given video game will be a success.

A sample prediction query follows. You can run this query in your Amazon SageMaker notebook after loading PyAthena, or using the Athena Console. You can also submit this query using Athena’s APIs or using the Athena Preview JDBC driver.

#Prediction

USING FUNCTION predict(platform int, genre int, critic_score int, user_score int, rating int) returns DOUBLE type SAGEMAKER_INVOKE_ENDPOINT
WITH (sagemaker_endpoint='xgboost-2019-11-22-00-52-22-742'),

FUNCTION normalize_genre(value VARCHAR) RETURNS int TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization'),

FUNCTION normalize_platform(value VARCHAR) RETURNS int TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization'),

FUNCTION normalize_rating(value VARCHAR) RETURNS int TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization')

SELECT predict(platform,
         genre,
         critic_score,
         user_score,
         rating),
         name
FROM 
    (SELECT name,
         normalize_platform(platform) AS platform, 
         normalize_genre(genre) AS genre, 
         critic_score, 
         user_score, 
         normalize_rating(rating) AS rating
FROM video_game_data.video_games);

Conclusion

In this blog, we introduced Athena’s new functionality that enables you to invoke your machine learning models in SQL queries to run inference. Using an example, we saw how to use Athena’s UDF functionality. We defined and invoked our functions to ready our dataset for machine learning model training purposes. To train our model, we used PyAthena to invoke Athena SQL queries in Amazon SageMaker and finally, invoked our ML model in SQL query to run inference.

Amazon Athena’s ML functionality is available today in preview in the us-east-1 (N. Virginia) region. Begin your preview now by following these Athena FAQ.
To learn more about the feature, please visit our querying ML model documentation.

 


About the Authors

Janak Agarwal is a product manager for Athena at AWS.

 

 

 

 

 Ronak Shah is a software development engineer for Athena at AWS.

 

 

 

 

Query any data source with Amazon Athena’s new federated query

Post Syndicated from Janak Agarwal original https://aws.amazon.com/blogs/big-data/query-any-data-source-with-amazon-athenas-new-federated-query/

Organizations today use data stores that are the best fit for the applications they build. For example, for an organization building a social network, a graph database such as Amazon Neptune is likely the best fit when compared to a relational database. Similarly, for workloads that require flexible schema for fast iterations, Amazon DocumentDB (with MongoDB compatibility) is likely a better fit. As Werner Vogels, CTO and VP of Amazon.com, said: “Seldom can one database fit the needs of multiple distinct use cases.” Developers today build highly distributed applications using a multitude of purpose-built databases. In a sense, developers are doing what they do best: dividing complex applications into smaller pieces, which allows them to choose the right tool for the right job. As the number of data stores and applications increase, running analytics across multiple data sources can become challenging.

Today, we are happy to announce a Public Preview of Amazon Athena support for federated queries.

Federated Query in Amazon Athena

Federated query is a new Amazon Athena feature that enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources. With Athena federated query, customers can submit a single SQL query and analyze data from multiple sources running on-premises or hosted on the cloud. Athena executes federated queries using Data Source Connectors that run on AWS Lambda. AWS has open-sourced Athena Data Source connectors for Amazon DynamoDB, Apache HBase, Amazon DocumentDB, Amazon Redshift, Amazon CloudWatch Logs, AWS CloudWatch Metrics, and JDBC-compliant relational data sources such MySQL, and PostgreSQL under the Apache 2.0 license. Customers can use these connectors to run federated SQL queries in Athena across these data sources. Additionally, using Query Federation SDK, customers can build connectors to any proprietary data source and enable Athena to run SQL queries against the data source. Since connectors run on Lambda, customers continue to benefit from Athena’s serverless architecture and do not have to manage infrastructure or scale for peak demands.

Running analytics on data spread across applications can be complex and time consuming. Application developers pick a data store based on the application’s primary function. As a result, data required for analytics is often spread across relational, key-value, document, in-memory, search, graph, time-series, and ledger databases. Event and application logs are often stored in object stores such as Amazon S3. To analyze data across these sources, analysts have to learn new programming languages and data access constructs, and build complex pipelines to extract, transform and load into a data warehouse before they can easily query the data. Data pipelines introduce delays and require custom processes to validate data accuracy and consistency across systems. Moreover, when source applications are modified, data pipelines have to be updated and data has to be re-stated for corrections. Federated queries in Athena eliminate this complexity by allowing customers to query data in-place wherever it resides. Analysts can use familiar SQL constructs to JOIN data across multiple data sources for quick analysis or use scheduled SQL queries to extract and store results in Amazon S3 for subsequent analysis.

The Athena Query Federation SDK extends the benefits of federated querying beyond AWS provided connectors. With fewer than 100 lines of code, customers can build connectors to proprietary data sources and share them across the organization. Connectors are deployed as Lambda functions and registered for use in Athena as data sources. Once registered, Athena invokes the connector to retrieve databases, tables, and columns available from the data source. A single Athena query can span multiple data sources. When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Since connectors are executed in Lambda, they can be used to access data from any data source on the cloud or on-premises that is accessible from Lambda.

Data Source Connectors

You can run SQL queries against new data sources by registering the data source with Athena. When registering the data source, you associate an Athena Data Connector specific to the data source. You can use AWS-provided open-source connectors, build your own, contribute to existing connectors, or use community or marketplace-built connectors. Depending on the type of data source, a connector manages metadata information; identifies specific parts of the tables that need to be scanned, read or filtered; and manages parallelism. Athena Data Source Connectors run as Lambda functions in your account.

Each data connector is composed of two Lambda functions, each specific to a data source: one for metadata and one for record reading. The connector code is open-source and should be deployed as Lambda functions. You can also deploy Lambda functions to AWS Serverless Application Repository and use them with Athena. Once the Lambda functions are deployed, they produce a unique Amazon Resource Name or ARN. You must register these ARNs with Athena. Registering an ARN allows Athena to understand with which Lambda function to talk during query execution. Once both the ARNs are registered, you can query the registered data source.

When a query runs on a federated data source, Athena fans out the Lambda invocations reading metadata and data in parallel. The number of parallel invocations depends on the Lambda concurrency limits enforced in your account. For example, if you have a limit of 300 concurrent Lambda invocations, Athena can invoke 300 parallel Lambda functions for record reading. For two queries running in parallel, Athena invokes twice the number of concurrent executions.

Diagram 1 shows how Athena Federated Queries work. When you submit a federated query to Athena, Athena will invoke the right Lambda-based connector to connect with your Data Source. Athena will fan out Lambda invocations to read metadata and data in parallel.

Diagram 1: Athena Federated Query Architecture

Example

This blog post demonstrates how data analysts can query data in multiple databases for faster analysis in one SQL query. For illustration purposes, consider an imaginary e-commerce company whose architecture leverages the following purpose-built data sources:

  1. Payment transaction records stored in Apache HBase running on AWS.
  2. Active orders, defined as customer orders not yet delivered, stored in Redis so that the processing engine can retrieve such orders quickly.
  3. Customer data such as email addresses, shipping information, etc., stored in DocumentDB.
  4. Product Catalog stored in Aurora.
  5. Order Processor’s log events housed in Amazon CloudWatch Logs.
  6. Historical orders and analytics in Redshift.
  7. Shipment tracking data in DynamoDB.
  8. A fleet of drivers performing last-mile delivery while using IoT-enabled tablets.

Customers of this imaginary e-commerce company have a problem. They have complained that their orders are stuck in a weird state. Some orders show as pending even though they have actually been delivered while other orders show as delivered but have not actually been shipped.

The company management has tasked the customer service analysts to determine the true state of all orders.

Using Athena federated queries

Using Athena’s query federation, the analysts can quickly analyze records from different sources. Additionally, they can setup a pipeline that can extract data from these sources, store them in Amazon S3 and use Athena to query them.

Diagram 2 shows Athena invoking Lambda-based connectors to connect with data sources that are on On Premises and in Cloud in the same query. In this diagram, Athena is scanning data from S3 and executing the Lambda-based connectors to read data from HBase in EMR, Dynamo DB, MySQL, RedShift, ElastiCache (Redis) and Amazon Aurora.

Diagram 2: Federated Query Example.

 

Analysts can register and use the following connectors found in this repository and run a query that:

  1. Grabs all active orders from Redis. (see athena-redis)
  2. Joins against any orders with ‘WARN’ or ‘ERROR’ events in Cloudwatch logs by using regex matching and extraction. (see athena-cloudwatch)
  3. Joins against our EC2 inventory to get the hostname(s) and status of the Order Processor(s) that logged the ‘WARN’ or ‘ERROR’. (see athena-cmdb)
  4. Joins against DocumentDB to obtain customer contact details for the affected orders. (see athena-docdb)
  5. Joins against a scatter-gather query sent to the Driver Fleet via Android Push notification. (see athena-android)
  6. Joins against DynamoDB to get shipping status and tracking details. (see athena-dynamodb)
  7. Joins against HBase to get payment status for the affected orders. (see athena-hbase)
  8. Joins against the advertising conversion data in BigQuery to see which promotions need to be applied if a re-order is needed. (see athena-bigquery)

Data Source Connector Registration

Analysts can register a data source connector using the Connect data source Flow in the Athena Query Editor.

  1. Choose Connect data source or Data sources on the Query Editor.
  2. Select the data source to which you want to connect, as shown in the following screenshot. You can also choose to write your own data source connector using the Query Federation SDK.
  3. Follow the rest of the steps in the UX to complete the registration. They involve configuring the connector function for your data source (as shown in the following screenshot), selecting a Name as the Catalog Name to use in your query, and providing a description.

Sample Analyst Query

Once the registration of the data source connectors is complete, the customer service analyst can write the following sample query to identify the affected orders in one SQL query, thus increasing the organization’s business velocity.

Below you’ll find a video demonstration of a sample federated query:

WITH logs 
     AS (SELECT log_stream, 
                message                                          AS 
                order_processor_log, 
                Regexp_extract(message, '.*orderId=(\d+) .*', 1) AS orderId, 
                Regexp_extract(message, '(.*):.*', 1)            AS log_level 
         FROM 
     "lambda:cloudwatch"."/var/ecommerce-engine/order-processor".all_log_streams 
         WHERE  Regexp_extract(message, '(.*):.*', 1) != 'WARN'), 
     active_orders 
     AS (SELECT * 
         FROM   redis.redis_db.redis_customer_orders), 
     order_processors 
     AS (SELECT instanceid, 
                publicipaddress, 
                state.NAME 
         FROM   awscmdb.ec2.ec2_instances), 
     customer 
     AS (SELECT id, 
                email 
         FROM   docdb.customers.customer_info), 
     addresses 
     AS (SELECT id, 
                is_residential, 
                address.street AS street 
         FROM   docdb.customers.customer_addresses),
     drivers
     AS ( SELECT name as driver_name, 
                 result_field as driver_order, 
                 device_id as truck_id, 
                 last_updated 
         FROM android.android.live_query where query_timeout = 5000 and query_min_results=5),
     impressions 
     AS ( SELECT path as advertisement, 
                 conversion
         FROM bigquery.click_impressions.click_conversions),
     shipments 
     AS ( SELECT order_id, 
                 shipment_id, 
                 from_unixtime(cast(shipped_date as double)) as shipment_time,
                 carrier
        FROM lambda_ddb.default.order_shipments),
     payments
     AS ( SELECT "summary:order_id", 
                 "summary:status", 
                 "summary:cc_id", 
                 "details:network" 
        FROM "hbase".hbase_payments.transactions)

SELECT _key_            AS redis_order_id, 
       customer_id, 
       customer.email   AS cust_email, 
       "summary:cc_id"  AS credit_card,
       "details:network" AS CC_type,
       "summary:status" AS payment_status,
       impressions.advertisement as advertisement,
       status           AS redis_status, 
       addresses.street AS street_address, 
       shipments.shipment_time as shipment_time,
       shipments.carrier as shipment_carrier,
       driver_name     AS driver_name,
       truck_id       AS truck_id,
       last_updated AS driver_updated,
       publicipaddress  AS ec2_order_processor, 
       NAME             AS ec2_state, 
       log_level, 
       order_processor_log 
FROM   active_orders 
       LEFT JOIN logs 
              ON logs.orderid = active_orders._key_ 
       LEFT JOIN order_processors 
              ON logs.log_stream = order_processors.instanceid 
       LEFT JOIN customer 
              ON customer.id = customer_id 
       LEFT JOIN addresses 
              ON addresses.id = address_id 
       LEFT JOIN drivers 
              ON drivers.driver_order = active_orders._key_ 
       LEFT JOIN impressions
              ON impressions.conversion = active_orders._key_
       LEFT JOIN shipments
              ON shipments.order_id = active_orders._key_
       LEFT JOIN payments
              ON payments."summary:order_id" = active_orders._key_

Additionally, Athena writes all query results in an S3 bucket that you specify in your query. If your use-case mandates you to ingest data into S3, you can use Athena’s query federation capabilities statement to register your data source, ingest to S3, and use CTAS statement or INSERT INTO statements to create partitions and metadata in Glue catalog as well as convert data format to a supported format.

Conclusion

In this blog, we introduced Athena’s new federated query feature. Using an example, we saw how to register and use Athena data source connectors to write federated queries to connect Athena to any data source accessible by AWS Lambda from your account. Finally, we learnt that we can use federated queries to not only enable faster analytics, but also to extract, transform and load data into your datalake in S3.

Athena federated query is available in Preview in the us-east-1 (N. Virginia) region. Begin your Preview now by following these steps in the Athena FAQ.
To learn more about the feature, please see documentation the Connect to a Data Source documentation here.
To get started with using an existing connector, please follow this Connect to a Data Source guide.
To learn how to build your own data source connector using the Athena Query Federation SDK, please visit this Athena example in GitHub .

 


About the Author

Janak Agarwal is a product manager for Athena at AWS.

 

 

 

Simplify ETL data pipelines using Amazon Athena’s federated queries and user-defined functions

Post Syndicated from Manny Wald original https://aws.amazon.com/blogs/big-data/simplify-etl-data-pipelines-using-amazon-athenas-federated-queries-and-user-defined-functions/

Amazon Athena recently added support for federated queries and user-defined functions (UDFs), both in Preview. See Query any data source with Amazon Athena’s new federated query for more details. Jornaya helps marketers intelligently connect consumers who are in the market for major life purchases such as homes, mortgages, cars, insurance, and education.

Jornaya collects data from a variety of data sources. Our main challenge is to clean and ingest this data into Amazon S3 to enable access for data analysts and data scientists.

Legacy ETL and analytics solution

In 2012, Jornaya moved from using from MySQL to Amazon DynamoDB for our real-time database. DynamoDB allowed a company of our size to receive the benefits of create, read, update, and delete (CRUD) operations with predictable low latency, high availability, and excellent data durability without the administrative burden of having to manage the database. This allowed our technology team to focus on solving business problems and rapidly building new products that we could bring to market.

Running analytical queries on NoSQL databases can be tough. We decided to extract data from DynamoDB and run queries on it. This was not simple.

Here are a few methods we use at Jornaya to get data from DynamoDB:

  • Leveraging EMR: We temporarily provision additional read capacity with DynamoDB tables and create transient EMR clusters to read data from DynamoDB and write to Amazon S3.
    • Our Jenkins jobs trigger pipelines that spin up a cluster, extract data using EMR, and use the Amazon Redshift copy command to load data into Amazon Redshift. This is an expensive process because we use excess read capacity. To lower EMR costs, we use spot instances.
  • Enabling DynamoDB Streams: We use a homegrown Python AWS Lambda function named Dynahose to consume data from the stream and write it to a Amazon Kinesis Firehose delivery stream. We then configure the Kinesis Firehose delivery stream to write the data to an Amazon S3 location. Finally, we use another homegrown Python Lambda function named Partition to ensure that the partitions corresponding to the locations of the data written to Amazon S3 are added to the AWS Glue Data Catalog so that it can read using tools like AWS Glue, Amazon Redshift Spectrum, EMR, etc.

The process is shown in the following diagram.

We go through such pipelines because we want to ask questions about our operational data in a natural way, using SQL.

Using Amazon Athena to simplify ETL workflows and enable quicker analytics

Athena, a fully managed serverless interactive service for querying data in Amazon S3 using SQL, has been rapidly adopted by multiple departments across our organization. For our use case, we did not require an always-on EMR cluster waiting for an analytics query. Athena’s serverless nature is perfect for our use case. Along the way we discovered that we could use Athena to run extract, transform, and load (ETL) jobs.

However, Athena is a lot more than an interactive service for querying data in Amazon S3. We also found Athena to be a robust, powerful, reliable, scalable, and cost-effective ETL tool. The ability to schedule SQL statements, along with support for Create Table As Select (CTAS) and INSERT INTO statements, helped us accelerate our ETL workloads.

Before Athena, business users in our organization had to rely on engineering resources build pipelines. The release of Athena changed that in a big way. Athena enabled software engineers and data scientists to work with data that would have otherwise been inaccessible or required help from data engineers.

With the addition of query federation and UDFs to Athena, Jornaya has been able to replace many of our unstable data pipelines with Athena to extract and transform data from DynamoDB and write it to Amazon S3. The product and engineering teams at Jornaya noticed our reduced ETL failure rates. The finance department took note of lower EMR and DynamoDB costs. And the members of our on-call rotation (as well as their spouses) have been able to enjoy uninterrupted sleep.

For instance, the build history of one ETL pipeline using EMR looked like this (the history of ETL pipeline executions is shown in this chart with the job execution id on the x-axis and the execution time in minutes on the y-axis):

After migrating this pipeline to Athena and using federated queries to query DynamoDB, we were able to access data sources with ease that we simply could not previously with queries like the following:

CREATE TABLE "__TABLE_NAME__"
WITH (
  external_location = '__S3_LOCATION__'
, format = 'PARQUET'
, orc_compression = 'SNAPPY'
, partitioned_by = ARRAY['create_day']
) AS
SELECT DISTINCT
  d.key.s AS device_id
, CAST(d.created.n AS DECIMAL(14, 4)) AS created
, d.token.s AS token
, c.industry AS industry_code
, CAST(CAST(FROM_UNIXTIME(CAST(d.created.n AS DECIMAL(14, 4))) AS DATE) AS VARCHAR) AS create_day
FROM "rdl"."device_frequency_stream" d
  LEFT OUTER JOIN "lambda::dynamodb"."default"."campaigns" c ON c.key = d.campaign_key
WHERE d.insert_ts BETWEEN TIMESTAMP '__PARTITION_START_DATE__' AND TIMESTAMP '__PARTITION_END_DATE__'
  AND d.created.n >= CAST(CAST(TO_UNIXTIME(DATE '__START_DATE__') AS DECIMAL(14, 4)) AS VARCHAR)
  AND d.created.n < CAST(CAST(TO_UNIXTIME(DATE '__END_DATE__') AS DECIMAL(14, 4)) AS VARCHAR);

We achieved a much more performant process with a build history shown in the following diagram:

Conclusion

Using one SQL query, we were able to process data from DynamoDB, convert that data to Parquet, apply Snappy compression, create the correct partitions in our AWS Glue Data Catalog, and ingest data to Amazon S3. Our ETL process execution time was reduced from hours to minutes, the cost was significantly lowered, and the new process is simpler and much more reliable. The new process using Athena for ETL is also future-proof due to extensibility. In case we need to import a dataset from another purpose-built data store that does not have a ready data source connector, we can simply use the data source connector SDK to write our own connector and deploy it in Production—a one-time effort that will cost us barely one day.

Additionally, Athena federated queries have empowered Jornaya to run queries that connect data from not just different data sources, but from different data paradigms! We can run a single query that seamlessly links data from a NoSQL datastore, an RDS RDBMS, and an Amazon S3 data lake.

 


About the Authors

Manny Wald is the technical co-founder at Jornaya. He holds multiple patents and is passionate about the power of the cloud, big data and AI to accelerate the rate at which companies can bring products to market to solve real-world problems. He has a background in BI, application development, data warehousing, web services, and building tools to manage transactional and analytical information. Additionally, Manny created the internet’s first weekly hip hop turntablism mix show, is admitted to practice law at the state and federal levels, and plays basketball whenever he gets the chance.

Janak Agarwal is a product manager for Athena at AWS.

Access and manage data from multiple accounts from a central AWS Lake Formation account

Post Syndicated from Shilpa Mehta original https://aws.amazon.com/blogs/big-data/access-and-manage-data-from-multiple-accounts-from-a-central-aws-lake-formation-account/

This post shows how to access and manage data in multiple accounts from a central AWS Lake Formation account. The walkthrough demonstrates a centralized catalog residing in the master Lake Formation account, with data residing in the different accounts.  The post shows how to grant access permissions from the Lake Formation service to read, write and update the catalog and access data in different accounts.

The post uses two datasets of data to determine if there is a cor-relation between the news generated around the world (gdelt) ) and the number of reviews that Amazon’s products received (amazonreviews).

Prerequisites

This walkthrough requires the use of three accounts, each with S3 buckets and their account numbers.

Setting up the Environment

The three accounts are as follows:

  • Account Products (AP) – This is the account in which the Amazon product reviews are stored. This post deploys the configuration using AWS CloudFormation.
  • Account External (AE) – This account monitors the world’s broadcast, print, and web news from around the world in over 100 languages. It identifies the people, locations, organizations, counts, themes, sources, emotions, quotes, images, and events driving global society every second of every day. This post deploys the configuration using AWS CloudFormation.
  • Main Account (MA) – This is the main account, which gathers data from the other two accounts. This post configures Lake Formation in this account. This account has access to the product data and world news account.

The following diagram shows the account architecture.

Account Products

Deploy the following AWS Cloud Formation template in the AP.

This creates an S3 bucket called productsaccountcf-bucketname-1pcfoxar1pxp (templateName-bucket-name-random_string) and a cross account bucket policy in the AP. This policy gives the main account root ID access to this bucket.

  • It uses a Lambda function to download the amazonreviews dataset into the new bucket created.

Make sure you insert the account number of your main account in the datalake-AccountId field. The following screenshot shows that for this post, the MA account number is 1111111111111.

Account External

Deploy the  following AWS Cloud Formation template in the AE.

This creates an S3 bucket called externalaccountcf-resultbucket-12ecq638afqiq (templateName-bucket-name_random_string) and a cross account bucket policy in the AE. This policy gives the main account access to this bucket. Due to the nature of the dataset this template also creates tables in the data catalog. Instead of AWS Glue crawlers, Athena queries are created on the structure of the tables.

The template executes queries in Amazon Athena to download the gdelt dataset, and to create the metadata of the tables that Lake Formation uses.

Make sure you insert the account number of your main account datalakeAccountid field. For this post, the MA account number is 1111111111111.Though the CloudFormation console will show CREATE_COMPLETE, a query is still executing, which you can observe in the Athena console. You can access the Athena console from AWS Management Console. The query that continues to run is creating a new table in the AE with the data in Parquet format so that queries can perform better.

The following screenshot shows your query history and status.

You are now ready to go to Lake Formation in your main account and start configuring.

Registering data stores in Lake Formation

Login to the Lake Formation console in your main account. If it’s the first time you are accessing Lake Formation, you need to add administrators to the account. The user is the account user you have logged into.

To add your data lakes, complete the following steps:

  1. On the Lake Formation console, under Register and ingest, choose Data lake locations. The page displays a list of S3 buckets that are marked as data lake storage resources for Lake Formation. Here, a single S3 bucket may act as the repository for many datasets, or you could use separate buckets for separate data sources. This post registers the S3 buckets in the other accounts and creates a master catalog in Lake Formation
  2. Choose Register location.The following screenshot shows the Data lake locations pane.You can now register both buckets you created in your AP and AE.
  3. In Amazon S3 location, for Amazon S3 path, enter s3://productsaccountcf-bucketname-1pcfoxar1pxp
  4. For IAM role, select You need an IAM role that gives Lake Formation the necessary permissions (GetObject, PutObject, DeleteObject, and ListBucket) to properly use your S3 bucket as a data lake. This default role has the necessary permissions. Alternatively, select a pre-existing IAM role that has required permissions and is configured with lakeformation.amazonaws.com as a trusted entity.
  5. Choose Register location.The following screenshot shows the Amazon S3 location pane.You now have a storage resource and are ready to register the second bucket.
  6. Repeat the previous steps, but in Step 3, register the bucket externalaccountcf-resultbucket-12ecq638afqiq from your AE (888888888888) as s3://externalaccountcf-resultbucket-12ecq638afqiq.

Setting up your IAM role

You need an IAM role that allows Lake Formation to create catalog tables of the datasets in the storage locations. Complete the following steps:

  1. On the AWS console, access IAM and create an IAM role
  2. Attach AWS Glue and AWS Lambda policies as described here.
  3. Edit the trust relationship for the role with the following policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "Service": ["glue.amazonaws.com", "lambda.amazonaws.com"]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

Add an inline policy  to give the role permissions to S3 to execute Athena queries, AWS Glue and to publish AWS CloudWatch logs as shown in the following screenshot.

Creating a database

Lake Formation maintains a Hive-compatible data catalog within your data lake. Before you can catalog data within your S3 storage backend or use Lake Formation data importers to push data to S3 (which this post discusses later), you must first create a database within your Lake Formation catalog.

A Lake Formation database is a logical construct to which you can later add tables. Each table contains a mapping to one or more objects in S3 that, collectively, represent that table. Tables also contain basic column metadata such as file format, S3 location, and column definitions. Optionally, you can also define arbitrary key-value pairs for tables and columns to better describe the data and act as queryable attributes for data discovery.

You can create one or more databases and populate their tables either manually in the console, programmatically via the AWS SDKs or AWS CLI, or automatically by defining AWS Glue crawlers.

This post defines two logical databases, amazonreviews and gdelt.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Choose Create database.The following screenshot shows the Databases pane.
  3. For Name, enter amazonreviews.
  4. For Location, enter s3://productsaccountcf-bucketname-1pcfoxar1pxp/amazonreviews.
  5. For Description, enter a brief, meaningful description.
  6. Clear Grant All to Everyone for new tables in this database.
  7. Choose Create database.The following screenshot shows the database details.
    1. Create gdelt database. Set Name to gdelt
    2. Set Locationto s3://externalaccountcf-resultbucket-12ecq638afqiq/gdelt
    3. Set Description to a brief, meaningful description like the one shown below
    4. Uncheck the box for Grant All to Everyone for new tables in this database

Granting permissions

You now have your databases and need to grant permissions to the role you created in Lake Formation. You need to configure your IAM users and roles as administrators.

  1. On the Lake Formation console, under Permissions, choose Admins and database creators. The following screenshot shows the Admins and database creators.
  2. Under Permissions choose Data Permissions
  3. From the Actions menu, choose Grant
  4. Select your new IAM role.
  5. For Database permissions, choose Create table and Grant all.
  6. Choose Grant.The following screenshot shows the Grant permissions pane. Repeat the previous steps for the amazonreviews and gdelt databases.
  7. Repeat the previous steps for the amazonreviews and gdelt.The next step is granting your role permissions to the data lakes you created.
  8. From Permissions, choose Data locations.
  9. Choose Grant
  10. Select your new role.The following screenshot shows the Data locations pane.
  11. For IAM users and roles, select your role.
  12. For Storage locations, enter s3://productsaccountcf-bucketname-1pcfoxar1pxp.
  13. Choose Grant.The following screenshot shows the Grant permissions pane.
  14. Repeat the steps for datalake s3://externalaccountcf-resultbucket-12ecq638afqiq/

Setting up your main account

Deploy the following CloudFormation stack in the MA.

This creates tables in the amazonreviews and gdelt databases

From the Actions menu choose Grant. Select the role or users to grant access and select the two check boxes, and choose Grant.

Querying the data

Now that you have the data in the catalog, you can perform queries from the master account with Athena between the datasets in different accounts.

Grant table permissions by completing the following steps:

  1. On the Lake Formation console, under Data catalog, choose Tables.
  2. Choose the table to query.
  3. From the Actions menu, choose Enter your role or user name.The following screenshot shows the Tables pane.
  1. For Table permissions, select Select.
  2. For Grantable permissions, select Alter, Insert, Drop, Delete, Select, and Grant all.The following screenshot shows the Grant permissions pane.
  1. Repeat the previous steps for the events table in the gdeltYou are now ready to query the data.
  1. In Tables, select the events
  2. From the Actions menu, choose View data.The following screenshot shows the Tables pane.
  1. Repeat the previous steps for the events table in the gdeltYou are now ready to query the data.
  1. In Tables, select the events
  2. From the Actions menu, choose View data.
  3. You will be taken to the AWS Athena consoleThe following screenshot shows the Athena console.
  1. Use the Query Editor tab and enter SQL queries for the reviews and events

To query the information by date, standardize the date columns and do aggregations by creating views. In sequential order, run the following queries:

CREATE VIEW amznrevw.aggreviews AS
SELECT count() as reviewcount, star_rating, verified_purchase, from_iso8601_date(review_date) as reviewdate FROM "amznrevw"."reviews2" group by star_rating, verified_purchase, review_date;

CREATE VIEW gdelt.eventsformatted AS
SELECT from_iso8601_date(substr(cast(day as varchar),1,4) || '-' || substr(cast(day as varchar),5,2)||'-' || substr(cast(day as varchar),7,2)) as eventdate, actor1code,actor1name,actor1countrycode, actor2code,actor2name,actor2countrycode FROM "gdelt"."events" ;

CREATE VIEW gdelt.eventsagregated AS
select count() as numevetns, eventdate, (count(distinct actor1code) + count(distinct actor2code)) as numactors from gdelt.eventsformatted group by eventdate;

You are now ready to query. You can determine how many gdelt events were in the five days with the most amount of reviews by performing the following query:

select eventdate, sum(reviewcount) as totalreviews, sum(numevetns) as totalnumevetns from gdelt.eventsagregated as event, amznrevw.aggreviews as review where event.eventdate = review.reviewdate group by eventdate order by totalreviews desc, totalnumevetns desc limit 5;

The following screenshot shows the query results.

January 3, 2015, had the most reviews but not the most gdelt events (833,890).

You can also discover how many reviews where performed in the five days with the most amount of gdelt events by performing the following query:

select eventdate, sum(reviewcount) as totalreviews, sum(numevetns) as totalnumevetns from gdelt.eventsagregated as event, amznrevw.aggreviews as review where event.eventdate = review.reviewdate group by eventdate order by totalnumevetns desc, totalreviews desc limit 5;

The following screenshot shows the query results.

January 25, 2012, had 2 million events but only 378 reviews.

You can also perform a final query to check the correlation between the two with the following query:

SELECT corr(reviewcount,
numevetns) AS review_event_correlation
FROM gdelt.eventsagregated AS event, amznrevw.aggreviews AS review
WHERE event.eventdate = review.reviewdate

The following screenshot shows the query results.

You can likely identify that there is no correlation between the two columns.

Conclusion

This post demonstrated how to set up cross-account access of datastores through a central Lake Formation catalog. The solution walked through creating two S3 buckets in external accounts, downloading some datasets on these buckets, and giving Lake Formation permission to access the data. You also learned how to govern the data in the data lakes from Lake Formation, and how to query the data in the two data lakes using Athena and Glue crawlers.

 


About the Authors


Shilpa Mehta is a Data Lab solutions architect at AWS.
Shilpa helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab.

 

 

 

Laura Caicedo Camacho is a solutions architect at AWS. She works with customers to help them embrace and adopt the cloud.

 

 

 

 

Luis Caro Perez is a solutions architect at AWS.  He works with our customers to provide guidance and technical assistance on their applications, helping them improving the value of their solutions when using AWS.

 

 

 

How ironSource built a multi-purpose data lake with Upsolver, Amazon S3, and Amazon Athena

Post Syndicated from Seva Feldman original https://aws.amazon.com/blogs/big-data/how-ironsource-built-a-multi-purpose-data-lake-with-upsolver-amazon-s3-and-amazon-athena/

ironSource, in their own words, is the leading in-app monetization and video advertising platform, making free-to-play and free-to-use possible for over 1.5B people around the world. ironSource helps app developers take their apps to the next level, including the industry’s largest in-app video network. Over 80,000 apps use ironSource technologies to grow their businesses.

The massive scale in which ironSource operates across its various monetization platforms—including apps, video, and mediation—leads to millions of end-devices generating massive amounts of streaming data. They need to collect, store, and prepare data to support multiple use cases while minimizing infrastructure and engineering overheads.

This post discusses the following:

  • Why ironSource opted for a data lake architecture based on Amazon S3.
  • How ironSource built the data lake using Upsolver.
  • How to create outputs to analytic services such as Amazon Athena, Amazon ES, and Tableau.
  • The benefits of this solution.

Advantages of a data lake architecture

After working for several years in a database-focused approach, the rapid growth in ironSource’s data made their previous system unviable from a cost and maintenance perspective. Instead, they adopted a data lake architecture, storing raw event data on object storage, and creating customized output streams that power multiple applications and analytic flows.

Why ironSource chose an AWS data lake

A data lake was the right solution for ironSource for the following reasons:

  • Scale – ironSource processes 500K events per second and over 20 billion events daily. The ability to store near-infinite amounts of data in S3 without preprocessing the data is crucial.
  • Flexibility – ironSource uses data to support multiple business processes. Because they need to feed the same data into multiple services to provide for different use cases, the company needed to bypass the rigidity and schema limitations entailed by a database approach. Instead, they store all the original data on S3 and create ad-hoc outputs and transformations as needed.
  • Resilience – Because all historical data is on S3, recovery from failure is easier, and errors further down the pipeline are less likely to affect production environments.

Why ironSource chose Upsolver

Upsolver’s streaming data platform automates the coding-intensive processes associated with building and managing a cloud data lake. Upsolver enables ironSource to support a broad range of data consumers and minimize the time DevOps engineers spend on data plumbing by providing a GUI-based, self-service tool for ingesting data, preparing it for analysis, and outputting structured tables to various query services.

Key benefits include the following:

  • Self-sufficiency for data consumers – As a self-service platform, Upsolver allows BI developers, Ops, and software teams to transform data streams into tabular data without writing code.
  • Improved performance – Because Upsolver stores files in optimized Parquet storage on S3, ironSource benefits from high query performance without manual performance tuning.
  • Elastic scaling – ironSource is in hyper-growth, so needs elastic scaling to handle increases in inbound data volume and peaks throughout the week, reprocessing of events from S3, and isolation between different groups that use the data.
  • Data privacy – Because ironSource’s VPC deploys Upsolver with no access from outside, there is no risk to sensitive data.

This post shows how ironSource uses Upsolver to build, manage, and orchestrate its data lake with minimal coding and maintenance.

Solution Architecture

The following diagram shows the architecture ironSource uses:

Architecture showing Apache Kafka with an arrow pointing left to Upsolver. Upsolver contains stream ingestion, schemaless data management and stateful data processing, it has two arrows coming out the bottom, each going to S3, one for raw data, the other for parquet files. Upsolver box has an arrow pointing right to a Query Engines box, which contains Athena, Redshift and Elastic. This box has a an arrow pointing right to Use cases, which contains product analytics, campaign performance and customer dashboards.

Streaming data from Kafka to Upsolver and storing on S3

Apache Kafka streams data from ironSource’s mobile SDK at a rate of up to 500K events per second. Upsolver pulls data from Kafka and stores it in S3 within a data lake architecture. It also keeps a copy of the raw event data while making sure to write each event exactly one time, and stores the same data as Parquet files that are optimized for consumption.

Building the input stream in Upsolver:

Using the Upsolver GUI, ironSource connects directly to the relevant Kafka topics and writes them to S3 precisely one time. See the following screenshot.

Image of the Upsolver UI showing the "Data Sources" tab is open to the "Create a Kafka Data Source" page with "Mobile SDK Cluster" highlighted under the "Compute Cluster" section.

After the data is stored in S3, ironSource can proceed to operationalize the data using a wide variety of databases and analytic tools. The next steps cover the most prominent tools.

Output to Athena

To understand production issues, developers and product teams need access to data. These teams can work with the data directly and answer their own questions by using Upsolver and Athena.

Upsolver simplifies and automates the process of preparing data for consumption in Athena, including compaction, compression, partitioning, and creating and managing tables in the AWS Glue Data Catalog. ironSource’s DevOps teams save hundreds of hours on pipeline engineering. Upsolver’s GUI creates each table one time, and from that point onwards, data consumers are entirely self-sufficient. To ensure queries in Athena run fast and at minimal cost, Upsolver also enforces performance-tuning best practices as data is ingested and stored on S3. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

Athena’s serverless architecture further compliments this independence, which means there’s no infrastructure to manage and analysts don’t need DevOps to use Amazon Redshift or query clusters for each new question. Instead, analysts can indicate the data they need and get answers.

Sending tables to Athena in Upsolver

In Upsolver, you can declare tables with associated schema using SQL or the built-in GUI. You can expose these tables to Athena through the AWS Glue Data Catalog. Upsolver stores Parquet files in S3 and creates the appropriate table and partition information in the AWS Glue Data Catalog by using Create and Alter DDL statements. You can also edit these tables with Upsolver Output to add, remove, or change columns. Upsolver automates the process of recreating table data on S3 and altering the metadata in the AWS Glue Data Catalog.

Creating the table

Image of the Upsolver UI showing the "Outputs" tab is open to the "Mobile SDK Data" page.

Sending the table to Amazon Athena

Image of the Upsolver UI showing the "Run Parameters" dialog box is open, having arrived there from the "Mobile SDK Data" page noted in the previous image.

Editing the table option for Outputs

Image on the "Mobile SDK Data" page showing the drop down menu from the 3 dots in the upper left with "Edit" highlighted.

Modifying an existing table in the Upsolver Output

Image showing "Alter Existing Table" with a radio button selected, along with a blurb that states "The changes will affect the existing rable from the time specific. Any data already written after that time with be deleted. The previous output will stop once it finishes processing all the data up to the specified time." Below that is a box showing an example data and time. The other option with a radio button not selected is "Create New Table" with the blurb "A new table will be created. The existing table and output will not be affected in any way by this operation. The buttons at the bottom are "Next" and "Cancel," with "Next" selected.

Output to BI platforms

IronSource’s BI analysts use Tableau to query and visualize data using SQL. However, performing this type of analysis on streaming data may require extensive ETL and data preparation, which can limit the scope of analysis and create reporting bottlenecks.

IronSource’s cloud data lake architecture enables BI teams to work with big data in Tableau. They use Upsolver to enrich and filter data and write it to Redshift to build reporting dashboards, or send tables to Athena for ad-hoc analytic queries. Tableau connects natively to both Redshift and Athena, so analysts can query the data using regular SQL and familiar tools, rather than relying on manual ETL processes.

Creating a reduced stream for Amazon ES

Engineering teams at IronSource use Amazon ES to monitor and analyze application logs. However, as with any database, storing raw data in Amazon ES is expensive and can lead to production issues.

Because a large part of these logs are duplicates, Upsolver deduplicates the data. This reduces Amazon ES costs and improves performance. Upsolver cuts down the size of the data stored in Amazon ES by 70% by aggregating identical records. This makes it viable and cost-effective despite generating a high volume of logs.

To do this, Upsolver adds a calculated field to the event stream, which indicates whether a particular log is a duplicate. If so, it filters the log out of the stream that it sends to Amazon ES.

Creating the calculated field

Image showing the Upsolver UI with the "Outputs" tab selected, showing the "Create Calcuated Field" page.

Filtering using the calculated field

Upsolver UI showing the "Outputs" tab selected, on the "Create Filter" page.

Conclusion

Self-sufficiency is a big part of ironSource’s development ethos. In revamping its data infrastructure, the company sought to create a self-service environment for dev and BI teams to work with data, without becoming overly reliant on DevOps and data engineering. Data engineers can now focus on features rather than building and maintaining code-driven ETL flows.

ironSource successfully built an agile and versatile architecture with Upsolver and AWS data lake tools. This solution enables data consumers to work independently with data, while significantly improving data freshness, which helps power both the company’s internal decision-making and external reporting.

Some of the results in numbers include:

  • Thousands of engineering hours saved – ironSource’s DevOps and data engineers save thousands of hours that they would otherwise spend on infrastructure by replacing manual, coding-intensive processes with self-service tools and managed infrastructure.
  • Fees reduction – Factoring infrastructure, workforce, and licensing costs, Upsolver significantly reduces ironSource’s total infrastructure costs.
  • 15-minute latency from Kafka to end-user – Data consumers can respond and take action with near real-time data.
  • 9X increase in scale – Currently at 0.5M incoming events/sec and 3.5M outgoing events/sec.

“It’s important for every engineering project to generate tangible value for the business,” says Seva Feldman, Vice President of Research and Development at ironSource Mobile. “We want to minimize the time our engineering teams, including DevOps, spend on infrastructure and maximize the time spent developing features. Upsolver has saved thousands of engineering hours and significantly reduced total cost of ownership, which enables us to invest these resources in continuing our hypergrowth rather than data pipelines.”

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors


Seva Feldman is Vice President of R&D at ironSource Mobile.
With over two decades of experience senior architecture, DevOps and engineering roles, Seva is an expert in turning operational challenges into opportunities for improvement.

 

Eran Levy is the Director of Marketing at Upsolver.

 

 

 

 

Roy Hasson is the Global Business Development Lead of Analytics and Data Lakes at AWS.. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Analyze Google Analytics data using Upsolver, Amazon Athena, and Amazon QuickSight

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyze-google-analytics-data-using-upsolver-amazon-athena-and-amazon-quicksight/

In this post, we present a solution for analyzing Google Analytics data using Amazon Athena. We’re including a reference architecture built on moving hit-level data from Google Analytics to Amazon S3, performing joins and enrichments, and visualizing the data using Amazon Athena and Amazon QuickSight. Upsolver is used for data lake automation and orchestration, enabling customers to get started quickly.

Google Analytics is a popular solution for organizations who want to understand the performance of their web properties and applications. Google Analytics data is collected and aggregated to help users extract insights quickly.  This works great for simple analytics. It’s less than ideal, however, when you need to enrich Google Analytics data with other datasets to produce a comprehensive view of the customer journey.

Why analyze Google Analytics data on AWS?

Google Analytics has become the de-facto standard web analytics tool. It is offered for free at lower data volumes and provides tracking, analytics, and reporting.  It enables non-technical users to understand website performance by answering questions such as: where are users coming from? Which pages have the highest conversion rates? Where are users experiencing friction and abandoning their shopping cart?

While these questions are answered within the Google Analytics UI, there are however some limitation, such as:

  • Data sampling: Google Analytics standard edition displays sampled data when running ad hoc queries on time periods that contain more than 500,000 sessions. Large websites can easily exceed this number on a weekly or even daily basis. This can create reliability issues between different reports, as each query can be fed by a different sample of the data.
  • Difficulty integrating with existing AWS stack: Many customers have built or are in the process of building their data and analytics platform on AWS. Customers want to use the AWS analytics and machine learning capabilities with their Google Analytics data to enable new and innovative use cases.
  • Joining with external data sources: Seeing the full picture of a business’ online activity might require combining web traffic data with other sources. Google Analytics does not offer a simple way to either move raw data in or out of the system. Custom dimensions in Google Analytics can be used, but they are limited to 20 for the standard edition and are difficult to use.
  • Multi-dimensional analysis: Google Analytics custom reports and APIs are limited to seven dimensions per query. This limits the depth of analysis and requires various workarounds for more granular slicing and dicing.
  • Lack of alternatives: Google Analytics 360, which allows users to export raw data to Google BigQuery, carries a hefty annual fee. This can be prohibitive for organizations. And even with this upgrade, the native integration is only with BigQuery, which means users still can’t use their existing AWS stack.

Building or buying a new web analytics solution (including cookie-based tracking) is also cost-prohibitive, and can interrupt existing workflows that rely on Google Analytics data.

Customers are looking for a solution to enable their analysts and business users to incorporate Google Analytics data into their existing workflows using familiar AWS tools.

Moving Google Analytics data to AWS: Defining the requirements

To provide an analytics solution with the same or better level of reporting as Google Analytics, we designed our solution around the following tenets:

  1. Analytics with a low technical barrier to entry: Google Analytics is built for business users, and our solution is designed to provide a similar experience. This means that beyond ingesting the data, we want to automate the data engineering work that goes into making the data ready for analysis.  This includes data retention, partitioning, and compression. All of this work must be done under the hood and remain invisible to the user querying the data.
  2. Hit-level data: Google Analytics tracks clickstream activity based on Hits – the lowest level of interaction between a user and a webpage. These hits are then grouped into Sessions – hits within a given time period, and Users – groups of sessions (more details here). The standard Google Analytics API is limited to session and user-based queries, and does not offer any simple way of extracting hit-level data. Our solution, however, does provide access to this granular data.
  3. Unsampled data: By extracting the data from Google Analytics and storing it on Amazon S3, we are able to bypass the 500K sessions limitation. We also have access to unsampled data for any query at any scale.
  4. Data privacy: If sensitive data is stored in Google Analytics, relying on third-party ETL tools can create risks around data privacy, especially in the era of GDPR. Therefore, our solution encrypts data in transit and relies exclusively on processing within the customer’s VPC.

Solution overview

The solution is built on extracting hit-level data and storing it in a data lake architecture on Amazon S3. We then use Amazon Athena and Amazon QuickSight for analytics and reporting. Upsolver, an AWS premier solution provider, is used to automate ingestion, ETL and data management on S3. Upsolver also orchestrate the entire solution with a simple-to-use graphical user interface.  The following diagram shows the high level architecture of our solutions.

Reference architecture showing the flow of data across Google Anaytics, Amazon Athena and Amazon QuickSight

Using Upsolver’s GA connector we extract unsampled, hit-level data from Google Analytics. This data is then automatically ingested according to accepted data lake best practices and stored in an optimized form on Amazon S3. The following best practices are applied to the data:

  • Store data in Apache Parquet columnar file format to improve read performance and reduce the amount of data scanned per query.
  • Partition data by event (hit) time rather than by API query time.
  • Perform periodic compaction by which small files are merged into larger ones improving performance and optimizing compression.

Once data is stored on S3, we use Upsolver’s GUI to create structured fact tables from the Google Analytics data. Users can query them using Amazon Athena and Amazon Redshift. Upsolver provides simple to use templates to help users quickly create tables from their Google Analytics data.  Finally, we use Amazon QuickSight to create interactive dashboards to visualize the data.

The result is a complete view of our Google Analytics data. This view provides the level of self-service analytics that users have grown accustomed to, at any scale, and without the limitations outlined earlier.

Building the solution: Step by step guide

In this section, we walk through the steps to set up the environment, configure Upsolver’s Google Analytics plugin, extract the data, and begin exploring.

Step 1: Installation and permissions

  1. Sign up for Upsolver (can also be done via the AWS Marketplace).
  2. Allow Upsolver access to read data from Google Analytics and add new custom dimensions. Custom dimensions enable Upsolver to read non-sampled hit-level data directly from Google Analytics instead of creating parallel tracking mechanisms that aren’t as trust-worthy.
  3. To populate the custom dimensions that were added to Google Analytics, allow Upsolver to run a small JavaScript code on your website. If you’re using GA360, this is not required.

Step 2: Review and clean the raw data

For supported data sources, Upsolver automatically discovers the schema and collects key statistics for every field in the table. Doing so gives users a glimpse into their data.

In the following screenshot, you can see schema-on-read information on the left side, stats per field and value distribution on the right side.

Screen shot of the Upsolver UI showing schema-on-read information on the left side, stats per field and value distribution on the right side

Step 3: Publishing to Amazon Athena

Upsolver comes with four templates for creating tables in your AWS based data lake according to the Google Analytics entity being analyzed:

  • Pageviews – used to analyze user flow and behavior on specific sections of the web property using metrics such as time on page and exit rate.
  • Events – user-defined interactions such as scroll depth and link clicks.
  • Sessions – monitor a specific journey in the web property (all pageviews and events).
  • Users – understand a user’s interaction with the web property or app over time.

All tables are partitioned by event time, which helps improve query performance.

Upsolver users can choose to run the templates as-is, modify them first or create new tables unique to their needs.

The following screenshot shows the schema produced by the Pageviews template:

Screen shot of the Upsolver UI showing the schema produced by the Pageviews template:

The following screenshot shows the Pageviews and Events tables as well as the Amazon Athena views for Sessions and Users generated by the Upsolver templates.

Screenshot showing the Pageviews and Events tables as well as the Athena views for Sessions and Users generated from the Upsolver templates.

The following are a couple example queries you may want to run to extract specific insights:

-- Popular page titles 
SELECT page_title, 
       Count(*) AS num_hits 
FROM   ga_hits_pageviews 
GROUP  BY page_title 
ORDER  BY 2 DESC 
-- User aggregations from hit data 
SELECT user_id, 
       Count(*)                   AS num_hits, 
       Count(DISTINCT session_id) AS num_of_sessions, 
       Sum(session_duration)      AS total_sessions_time 
FROM   ga_hits_pageviews 
GROUP  BY user_id 

Step 4: Visualization in Amazon QuickSight

Now that the data has been ingested, cleansed, and written to S3 in a structured manner, we are ready visualize it with Amazon QuickSight. Start by creating a dashboard to mimic the one provided by Google Analytics.  But we don’t need to stop there.  We can use QuickSight ML Insights to extract deeper insights from our data.  We can also embed Amazon QuickSight visualizations into existing web portals and applications making insights available to everyone.

Screenshot of QuickSight visual ization showing several sections, one with a graph, several others with various statistics

Screen shot of QuickSight showing a global map with usage concentrations marked by bubbles, alongside a pie graph.

Sreenshot of QuickSight showing a bar graph, alongside a table with various data values.

Conclusion

With minimal setup, we were able to extract raw hit-level Google Analytics data, prepare, and stored it in a data lake on Amazon S3.  Using Upsolver, combined with Amazon Athena and Amazon QuickSight, we built a feature-complete solution for analyzing web traffic collected by Google Analytics on AWS.

Key technical benefits:

  • Schema on-read means data consumers don’t need to model the data into a table structure, and can instantly understand what their top dimensions are. For example, 85% of my users navigate my website using Google Chrome browser.
  • Graphical user interface that enables self-service consumption of Google Analytics data.
  • Fast implementation using pre-defined templates that map raw data from Google Analytics to tables in the data lake.
  • Ability to replay historical Google Analytics data stored on Amazon S3.
  • Ability to partition the data on Amazon S3 by hit time reducing complexity of handling late arriving events.
  • Optimize data on Amazon S3 automatically for improved query performance.
  • Automatically manage tables and partitions in AWS Glue Data Catalog.
  • Fully integrated with a suite of AWS native services – Amazon S3, Amazon Athena, Amazon Redshift and Amazon QuickSight.

Now that we have feature parity, we can begin to explore integrating other data sources such as CRM, sales, and customer profile to build a true 360-degree view of the customer.  Furthermore, you can now begin using AWS Machine Learning services to optimize traffic to your websites, forecast demand and personalize the user experience.

We’d love to hear what you think. Please feel free to leave a comment with any feedback or questions you may have.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors


Roy Hasson is the global business development lead of analytics and data lakes at AWS.
He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Eran Levy is the director of marketing at Upsolver.

 

 

 

 

Extract Oracle OLTP data in real time with GoldenGate and query from Amazon Athena

Post Syndicated from Sreekanth Krishnavajjala original https://aws.amazon.com/blogs/big-data/extract-oracle-oltp-data-in-real-time-with-goldengate-and-query-from-amazon-athena/

This post describes how you can improve performance and reduce costs by offloading reporting workloads from an online transaction processing (OLTP) database to Amazon Athena and Amazon S3. The architecture described allows you to implement a reporting system and have an understanding of the data that you receive by being able to query it on arrival. In this solution:

  • Oracle GoldenGate generates a new row on the target for every change on the source to create Slowly Changing Dimension Type 2 (SCD Type 2) data.
  • Athena allows you to run ad hoc queries on the SCD Type 2 data.

Principles of a modern reporting solution

Advanced database solutions use a set of principles to help them build cost-effective reporting solutions. Some of these principles are:

  • Separate the reporting activity from the OLTP. This approach provides resource isolation and enables databases to scale for their respective workloads.
  • Use query engines running on top of distributed file systems like Hadoop Distributed File System (HDFS) and cloud object stores, such as Amazon S3. The advent of query engines that can run on top of open-source HDFS and cloud object stores further reduces the cost of implementing dedicated reporting systems.

Furthermore, you can use these principles when building reporting solutions:

  • To reduce licensing costs of the commercial databases, move the reporting activity to an open-source database.
  • Use a log-based, real-time, change data capture (CDC), data-integration solution, which can replicate OLTP data from source systems, preferably in real-time mode, and provide a current view of the data. You can enable the data replication between the source and the target reporting systems using database CDC solutions. The transaction log-based CDC solutions capture database changes noninvasively from the source database and replicate them to the target datastore or file systems.

Prerequisites

If you use GoldenGate with Kafka and are considering cloud migration, you can benefit from this post. This post also assumes prior knowledge of GoldenGate and does not detail steps to install and configure GoldenGate. Knowledge of Java and Maven is also assumed. Ensure that a VPC with three subnets is available for manual deployment.

Understanding the architecture of this solution

The following workflow diagram (Figure 1) illustrates the solution that this post describes:

  1. Amazon RDS for Oracle acts as the source.
  2. A GoldenGate CDC solution produces data for Amazon Managed Streaming for Apache Kafka (Amazon MSK). GoldenGate streams the database CDC data to the consumer. Kafka topics with an MSK cluster receives the data from GoldenGate.
  3. The Apache Flink application running on Amazon EMR consumes the data and sinks it into an S3 bucket.
  4. Athena analyzes the data through queries. You can optionally run queries from Amazon Redshift Spectrum.

Data Pipeline

Figure 1

Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision  Kafka clusters with few clicks without the need to provision servers, storage and configuring Apache Zookeeper manually. Kafka is an open-source platform for building real-time streaming data pipelines and applications.

Amazon RDS for Oracle is a fully managed database that frees up your time to focus on application development. It manages time-consuming database administration tasks, including provisioning, backups, software patching, monitoring, and hardware scaling.

GoldenGate is a real-time, log-based, heterogeneous database CDC solution. GoldenGate supports data replication from any supported database to various target databases or big data platforms like Kafka. GoldenGate’s ability to write the transactional data captured from the source in different formats, including delimited text, JSON, and Avro, enables seamless integration with a variety of BI tools. Each row has additional metadata columns including database operation type (Insert/Update/Delete).

Flink is an open-source, stream-processing framework with a distributed streaming dataflow engine for stateful computations over unbounded and bounded data streams. EMR supports Flink, letting you create managed clusters from the AWS Management Console. Flink also supports exactly-once semantics with the checkpointing feature, which is vital to ensure data accuracy when processing database CDC data. You can also use Flink to transform the streaming data row by row or in batches using windowing capabilities.

S3 is an object storage service with high scalability, data availability, security, and performance. You can run big data analytics across your S3 objects with AWS query-in-place services like Athena.

Athena is a serverless query service that makes it easy to query and analyze data in S3. With Athena and S3 as a data source, you define the schema and start querying using standard SQL. There’s no need for complex ETL jobs to prepare your data for analysis, which makes it easy for anyone familiar with SQL skills to analyze large-scale datasets quickly.

The following diagram shows a more detailed view of the data pipeline:

  1. RDS for Oracle runs in a Single-AZ.
  2. GoldenGate runs on an Amazon EC2 instance.
  3. The MSK cluster spans across three Availability Zones.
  4. Kafka topic is set up in MSK.
  5. Flink runs on an EMR Cluster.
  6. Producer Security Group for Oracle DB and GoldenGate instance.
  7. Consumer Security Group for EMR with Flink.
  8. Gateway endpoint for S3 private access.
  9. NAT Gateway to download software components on GoldenGate instance.
  10. S3 bucket and Athena.

For simplicity, this setup uses a single VPC with multiple subnets to deploy resources.

Figure 2

Configuring single-click deployment using AWS CloudFormation

The AWS CloudFormation template included in this post automates the deployment of the end-to-end solution that this blog post describes. The template provisions all required resources including RDS for Oracle, MSK, EMR, S3 bucket, and also adds an EMR step with a JAR file to consume messages from Kafka topic on MSK. Here’s the list of steps to launch the template and test the solution:

  1. Launch the AWS CloudFormation template in the us-east-1
  2. After successful stack creation, obtain GoldenGate Hub Server public IP from the Outputs tab of cloudformation.
  3. Login to GoldenGate hub server using the IP address from step 2 as ec2-user and then switch to oracle user.sudo su – oracle
  4. Connect to the source RDS for Oracle database using the sqlplus client and provide password(source).[[email protected] ~]$ sqlplus [email protected]
  5. Generate database transactions using SQL statements available in oracle user’s home directory.
    SQL> @s
    
     SQL> @s1
    
     SQL> @s2

  6. Query STOCK_TRADES table from Amazon Athena console. It takes a few seconds after committing transactions on the source database for database changes to be available for Athena for querying.

Manually deploying components

The following steps describe the configurations required to stream Oracle-changed data to MSK and sink it to an S3 bucket using Flink running on EMR. You can then query the S3 bucket using Athena. If you deployed the solution using AWS CloudFormation as described in the previous step, skip to the Testing the solution section.

 

  1. Prepare an RDS source database for CDC using GoldenGate.The RDS source database version is Enterprise Edition 12.1.0.2.14. For instructions on configuring the RDS database, see Using Oracle GoldenGate with Amazon RDS. This post does not consider capturing data definition language (DDL).
  2. Configure an EC2 instance for the GoldenGate hub server.Configure the GoldenGate hub server using Oracle Linux server 7.6 (ami-b9c38ad3) image in the us-east-1 Region. The GoldenGate hub server runs the GoldenGate extract process that extracts changes in real time from the database transaction log files. The server also runs a replicat process that publishes database changes to MSK.The GoldenGate hub server requires the following software components:
  • Java JDK 1.8.0 (required for GoldenGate big data adapter).
  • GoldenGate for Oracle (12.3.0.1.4) and GoldenGate for big data adapter (12.3.0.1).
  • Kafka 1.1.1 binaries (required for GoldenGate big data adapter classpath).
  • An IAM role attached to the GoldenGate hub server to allow access to the MSK cluster for GoldenGate processes running on the hub server.Use the GoldenGate (12.3.0) documentation to install and configure the GoldenGate for Oracle database. The GoldenGate Integrated Extract parameter file is eora2msk.prm.
    EXTRACT eora2msk
    SETENV (NLSLANG=AL32UTF8)
    
    USERID [email protected], password ggadmin
    TRANLOGOPTIONS INTEGRATEDPARAMS (max_sga_size 256)
    EXTTRAIL /u01/app/oracle/product/ogg/dirdat/or
    LOGALLSUPCOLS
    
    TABLE SOURCE.STOCK_TRADES;

    The logallsupcols extract parameter ensures that a full database table row is generated for every DML operation on the source, including updates and deletes.

  1. Create a Kafka cluster using MSK and configure Kakfa topic.You can create the MSK cluster from the AWS Management Console, using the AWS CLI, or through an AWS CloudFormation template.
  • Use the list-clusters command to obtain a ClusterArn and a Zookeeper connection string after creating the cluster. You need this information to configure the GoldenGate big data adapter and Flink consumer. The following code illustrates the commands to run:
    $aws kafka list-clusters --region us-east-1
    {
        "ClusterInfoList": [
            {
                "EncryptionInfo": {
                    "EncryptionAtRest": {
                        "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:xxxxxxxxxxxx:key/717d53d8-9d08-4bbb-832e-de97fadcaf00"
                    }
                }, 
                "BrokerNodeGroupInfo": {
                    "BrokerAZDistribution": "DEFAULT", 
                    "ClientSubnets": [
                        "subnet-098210ac85a046999", 
                        "subnet-0c4b5ee5ff5ef70f2", 
                        "subnet-076c99d28d4ee87b4"
                    ], 
                    "StorageInfo": {
                        "EbsStorageInfo": {
                            "VolumeSize": 1000
                        }
                    }, 
                    "InstanceType": "kafka.m5.large"
                }, 
                "ClusterName": "mskcluster", 
                "CurrentBrokerSoftwareInfo": {
                    "KafkaVersion": "1.1.1"
                }, 
                "CreationTime": "2019-01-24T04:41:56.493Z", 
                "NumberOfBrokerNodes": 3, 
                "ZookeeperConnectString": "10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181", 
                "State": "ACTIVE", 
                "CurrentVersion": "K13V1IB3VIYZZH", 
                "ClusterArn": "arn:aws:kafka:us-east-1:xxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3", 
                "EnhancedMonitoring": "DEFAULT"
            }
        ]
    }

  • Obtain the IP addresses of the Kafka broker nodes by using the ClusterArn.
    $aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3
    {
        "BootstrapBrokerString": "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092"
    }

  • Create a Kafka topic. The solution in this post uses the same name as table name for Kafka topic.
    ./kafka-topics.sh --create --zookeeper 10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181 --replication-factor 3 --partitions 1 --topic STOCK_TRADES

  1. Provision an EMR cluster with Flink.Create an EMR cluster 5.25 with Flink 1.8.0 (advanced option of the EMR cluster), and enable SSH access to the master node. Create and attach a role to the EMR master node so that Flink consumers can access the Kafka topic in the MSK cluster.
  2. Configure the Oracle GoldenGate big data adapter for Kafka on the GoldenGate hub server.Download and install the Oracle GoldenGate big data adapter (12.3.0.1.0) using the Oracle GoldenGate download link. For more information, see the Oracle GoldenGate 12c (12.3.0.1) installation documentation.The following is the GoldenGate producer property file for Kafka (custom_kafka_producer.properties):
    #Bootstrap broker string obtained from Step 3
    bootstrap.servers= 10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092
    #bootstrap.servers=localhost:9092
    acks=1
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    # 100KB per partition
    batch.size=16384
    linger.ms=0

    The following is the GoldenGate properties file for Kafka (Kafka.props):

    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    #The following resolves the topic name using the short table name
    #gg.handler.kafkahandler.topicName=SOURCE
    gg.handler.kafkahandler.topicMappingTemplate=${tableName}
    #The following selects the message key using the concatenated primary keys
    gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
    gg.handler.kafkahandler.format=json_row
    #gg.handler.kafkahandler.format=delimitedtext
    #gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
    #gg.handler.kafkahandler.SchemaTopicName=oratopic
    gg.handler.kafkahandler.BlockingSend =false
    gg.handler.kafkahandler.includeTokens=false
    gg.handler.kafkahandler.mode=op
    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE
    
    gg.log=log4j
    #gg.log.level=INFO
    gg.log.level=DEBUG
    gg.report.time=30sec
    gg.classpath=dirprm/:/home/oracle/kafka/kafka_2.11-1.1.1/libs/*
    
    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

    The following is the GoldenGate replicat parameter file (rkafka.prm):

    REPLICAT rkafka
    -- Trail file for this example is located in "AdapterExamples/trail" directory
    -- Command to add REPLICAT
    -- add replicat rkafka, exttrail AdapterExamples/trail/tr
    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 10000
    MAP SOURCE.STOCK_TRADES, TARGET SOURCE.STOCK_TRADES;

  3. Create an S3 bucket and directory with a table name underneath for Flink to store (sink) Oracle CDC data.
  4. Configure a Flink consumer to read from the Kafka topic that writes the CDC data to an S3 bucket.For instructions on setting up a Flink project using the Maven archetype, see Flink Project Build Setup.The following code example is the pom.xml file, used with the Maven project. For more information, see Getting Started with Maven.
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-quickstart-java</artifactId>
      <version>1.8.0</version>
      <packaging>jar</packaging>
    
      <name>flink-quickstart-java</name>
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>@[email protected]</slf4j.version>
        <log4j.version>@[email protected]</log4j.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-filesystem_2.11</artifactId>
         <version>1.8.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-presto</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
       <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.11</artifactId>
          <version>2.4.20</version>
        </dependency>
        <dependency>
           <groupId>com.typesafe.akka</groupId>
           <artifactId>akka-protobuf_2.11</artifactId>
           <version>2.4.20</version>
        </dependency>
    <build>
      <plugins>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                       <execution>
                          <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                             </goals>
                           <configuration>
                          <artifactSet>
                      <excludes>
    
                             <!-- Excludes here -->
                               </excludes>
    </artifactSet>
                    <filters>
                            <filter>
                                                                                     <artifact>org.apache.flink:*</artifact>
                            </filter>
                       </filters>
                 <transformers>
                   <!-- add Main-Class to manifest file -->
                                                                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                                                            <mainClass>flinkconsumer.flinkconsumer</mainClass>
                   </transformer>
                                                                             <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                                                         <resource>reference.conf</resource>
                                                                    </transformer>
                                                            </transformers>
                    <relocations>
                          <relocation>
                                                                <pattern>org.codehaus.plexus.util</pattern>
                                                                  <shadedPattern>org.shaded.plexus.util</shadedPattern>
                        <excludes>
                                                                      <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                                                      <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                                                  </excludes>
                                                               </relocation>
                                                            </relocations>
                                                            <createDependencyReducedPom>false</createDependencyReducedPom>
                                                    </configuration>
                                            </execution>
                                    </executions>
                            </plugin>
    <!-- Add the main class as a manifest entry -->
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-jar-plugin</artifactId>
                                    <version>2.5</version>
                                    <configuration>
                                            <archive>
                                                    <manifestEntries>
                                                            <Main-Class>flinkconsumer.flinkconsumer</Main-Class>
                                                    </manifestEntries>
                                            </archive>
           </configuration>
                            </plugin>
    
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <version>3.1</version>
                                    <configuration>
                                            <source>1.7</source>
                                            <target>1.7</target>
                                    </configuration>
                            </plugin>
                    </plugins>
    
    </build>
    <profiles>
                    <profile>
                            <id>build-jar</id>
                            <activation>
                                    <activeByDefault>false</activeByDefault>
                            </activation>
                    </profile>
            </profiles>
    
    
    </project>

    Compile the following Java program using mvn clean install and generate the JAR file:

    package flinkconsumer;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.TypeExtractor;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    import org.apache.flink.streaming.util.serialization.SerializationSchema;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.slf4j.LoggerFactory;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import akka.actor.ActorSystem;
    import akka.stream.ActorMaterializer;
    import akka.stream.Materializer;
    import com.typesafe.config.Config;
    import org.apache.flink.streaming.connectors.fs.*;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
    import java.util.stream.Collectors;
    import java.util.Arrays;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    import java.util.regex.Pattern;
    import java.io.*;
    import java.net.BindException;
    import java.util.*;
    import java.util.Map.*;
    import java.util.Arrays;
    
    public class flinkconsumer{
    
        public static void main(String[] args) throws Exception {
            // create Streaming execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setBufferTimeout(1000);
            env.enableCheckpointing(5000);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092");
            properties.setProperty("group.id", "flink");
            properties.setProperty("client.id", "demo1");
    
            DataStream<String> message = env.addSource(new FlinkKafkaConsumer<>("STOCK_TRADES", new SimpleStringSchema(),properties));
            env.enableCheckpointing(60_00);
            env.setStateBackend(new FsStateBackend("hdfs://ip-10-0-3-12.ec2.internal:8020/flink/checkpoints"));
    
            RollingSink<String> sink= new RollingSink<String>("s3://flink-stream-demo/STOCK_TRADES");
           // sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HHmm"));
           // The bucket part file size in bytes.
               sink.setBatchSize(400);
             message.map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;
                @Override
                public String map(String value) throws Exception {
                    //return " Value: " + value;
                    return value;
                }
            }).addSink(sink).setParallelism(1);
            env.execute();
        }
    }

    Log in as a Hadoop user to an EMR master node, start Flink, and execute the JAR file:

    $ /usr/bin/flink run ./flink-quickstart-java-1.7.0.jar

  5. Create the stock_trades table from the Athena console. Each JSON document must be on a new line.
    CREATE EXTERNAL TABLE `stock_trades`(
      `trade_id` string COMMENT 'from deserializer', 
      `ticker_symbol` string COMMENT 'from deserializer', 
      `units` int COMMENT 'from deserializer', 
      `unit_price` float COMMENT 'from deserializer', 
      `trade_date` timestamp COMMENT 'from deserializer', 
      `op_type` string COMMENT 'from deserializer')
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://flink-cdc-demo/STOCK_TRADES'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'transient_lastDdlTime'='1561051196')

    For more information, see Hive JSON SerDe.

Testing the solution

To test that the solution works, complete the following steps:

  1. Log in to the source RDS instance from the GoldenGate hub server and perform insert, update, and delete operations on the stock_trades table:
    $sqlplus [email protected]
    SQL> insert into stock_trades values(6,'NEW',29,75,sysdate);
    SQL> update stock_trades set units=999 where trade_id=6;
    SQL> insert into stock_trades values(7,'TEST',30,80,SYSDATE);
    SQL>insert into stock_trades values (8,'XYZC', 20, 1800,sysdate);
    SQL> commit;

  2. Monitor the GoldenGate capture from the source database using the following stats command:
    [[email protected] 12.3.0]$ pwd
    /u02/app/oracle/product/ogg/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate Command Interpreter for Oracle
    Version 12.3.0.1.4 OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO
    Linux, x64, 64bit (optimized), Oracle 12c on Apr 16 2018 00:53:30
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats eora2msk

  3. Monitor the GoldenGate replicat to a Kafka topic with the following:
    [[email protected] 12.3.0]$ pwd
    /u03/app/oracle/product/ogg/bdata/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate for Big Data
    Version 12.3.2.1.1 (Build 005)
    
    Oracle GoldenGate Command Interpreter
    Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
    Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats rkafka

  4. Query the stock_trades table using the Athena console.

Summary

This post illustrates how you can offload reporting activity to Athena with S3 to reduce reporting costs and improve OLTP performance on the source database. This post serves as a guide for setting up a solution in the staging environment.

Deploying this solution in a production environment may require additional considerations, for example, high availability of GoldenGate hub servers, different file encoding formats for optimal query performance, and security considerations. Additionally, you can achieve similar outcomes using technologies like AWS Database Migration Service instead of GoldenGate for database CDC and Kafka Connect for the S3 sink.

 


About the Authors

Sreekanth Krishnavajjala is a solutions architect at Amazon Web Services.

 

 

 

 

Vinod Kataria is a senior partner solutions architect at Amazon Web Services.

Perform biomedical informatics without a database using MIMIC-III data and Amazon Athena

Post Syndicated from James Wiggins original https://aws.amazon.com/blogs/big-data/perform-biomedical-informatics-without-a-database-using-mimic-iii-data-and-amazon-athena/

Biomedical researchers require access to accurate, detailed data. The MIT MIMIC-III dataset is a popular resource. Using Amazon Athena, you can execute standard SQL queries against MIMIC-III without first loading the data into a database. Your analyses always reference the most recent version of the MIMIC-III dataset.

This post describes how to make the MIMIC-III dataset available in Athena and provide automated access to an analysis environment for MIMIC-III on AWS. We also compare a MIMIC-III reference bioinformatics study using a traditional database to that same study using Athena.

Overview

A dataset capturing a variety of measures longitudinally over time, across many patients, can drive analytics and machine learning toward research discovery and improved clinical decision-making. These features describe the MIT Laboratory of Computational Physiology (LCP) MIMIC-III dataset. In the words of LCP researchers:

“MIMIC-III is a large, publicly available database comprising de-identified health-related data associated with approximately 60K admissions of patients who stayed in critical care units of the Beth Israel Deaconess Medical Center between 2001 and 2012. …MIMIC supports a diverse range of analytic studies spanning epidemiology, clinical decision-rule improvement, and electronic tool development. It is notable for three factors: it is publicly and freely available, it encompasses a diverse and large population of ICU patients, and it contains high temporal resolution data including lab results, electronic documentation, and bedside monitor trends and waveforms.”

Recently, the Registry of Open Data on AWS (RODA) program made the MIMIC-III dataset available through the AWS Cloud. You can now use the MIMIC-III dataset without having to download, copy, or pay to store it. Instead, you can analyze the MIMIC-III dataset in the AWS Cloud using AWS services like Amazon EC2, Athena, AWS Lambda, or Amazon EMR. AWS Cloud availability enables quicker and cheaper research into the dataset.

Services like Athena also offer you new analytical approaches to the MIMIC-III dataset. Using Athena, you can execute standard SQL queries against MIMIC-III without first loading the data into a database. Because you can reference the MIMIC-III dataset hosted in the RODA program, your analyses always reference the most recent version of the MIMIC-III dataset. Live hosting reduces upfront time and effort, eliminates data synchronization issues, improves data analysis, and reduces overall study costs.

Transforming MIMIC-III data

Historically, the MIMIC team distributed the MIMIC-III dataset in compressed (gzipped) CSV format. The choice of CSV format reflects the loading of MIMIC-III data into a traditional relational database for analysis.

In contrast, the MIMIC team provides the MIMIC-III dataset to the RODA program in the following formats:

  • The traditional CSV format
  • An Apache Parquet format optimized for modern data processing technologies such as Athena

Apache Parquet stores data by column, so queries that fetch specific columns can run without reading the whole table. This optimization helps improve the performance and lower the cost of many query types common to biomedical informatics.

To convert the original MIMIC-III CSV dataset to Apache Parquet, we created a data transformation job using AWS Glue. The MIMIC team schedules the AWS Glue job to run as needed, updating the Parquet files in the RODA program with any changes to the CSV dataset. These scheduled updates keep the Parquet files current without interfering with the MIMIC team’s CSV dataset creation procedures. Find the code for this AWS Glue job in the mimic-code GitHub repo. Use the same code as a basis to develop other CSV-to-Parquet conversions.

The code that converts the MIMIC-III NOTEEVENTS table offers a valuable resource. This table stores long medical notes made up of multiple lines, with commas, double-quotes, and other characters that can be challenging to transform. For example, you can use the following Spark read statement to interpret that CSV file:

df = spark.read.csv('s3://'+mimiccsvinputbucket+'/NOTEEVENTS.csv.gz',\
	header=True,\
	schema=schema,\
	multiLine=True,\
	quote='"',\
	escape='"')

Executing the indwelling arterial catheter study using MIMIC-III

For example, the MIMIC team provides code for the MIMIC indwelling arterial catheter (IAC) aline study. The study uses the MIMIC-III dataset to reproduce findings from the published study, The Association Between Indwelling Arterial Catheters and Mortality in Hemodynamically Stable Patients With Respiratory Failure.

You can obtain the aline study code, as well as many other analytic examples, in the MIT Laboratory of Computational Physiology MIMIC Code Repository in GitHub. Learn more about the provided code in the JAMA paper, The MIMIC Code Repository: enabling reproducibility in critical care research.

The aline study code comprises about 1400 lines of SQL. It was developed for a PostgreSQL database, and is executed and further analyzed by Python and R code. We ran the aline study against the Parquet MIMIC-III dataset using Athena instead of PostgreSQL to answer the following questions:

  • What modifications would be required, if any, to run the study using Athena, instead of PostgreSQL?
  • How would performance differ?
  • How would cost differ?

Some SQL features used in the study work differently in Athena than they do in PostgreSQL. As a result, about 5% of the SQL statements needed modification. Specifically, the SQL statements require the following types of modification for use with Athena:

Instead of using materialized views, in Athena, create a new table. For instance, the CREATE MATERIALIZED VIEW statement can be written as CREATE TABLE.

  • Instead of using schema context statements like SET SEARCH_PATH, in Athena, explicitly declare the schema of referenced tables. For instance, instead of having a SET SEARCH_PATH statement before your query and referencing tables without a schema as follows:

SET SEARCH_PATH TO mimiciii;

left join chartevents ce

You declare the schema as a part of every table reference:

left join mimiciii.chartevents ce

  • Epoch time, as well as timestamp arithmetic, works differently in Athena than PostgreSQL. So, instead of arithmetic, use the date_diff() and specify seconds as the return value.

extract(epoch from endtime-starttime)/24.0/60.0/60.0 

The preceding statement can be written as:

date_diff('second',starttime, endtime)/24.0/60.0/60.0

  • Athena uses the “double” data type instead of the “numeric” data type.

ROUND( cast(f.height_first as numeric), 2) AS height_first,

The preceding statement can be written as:

ROUND( cast(f.height_first as double), 2) AS height_first,

  • If VARCHAR fields are empty, they are not detected as NULL. Instead, compare a VARCHAR to an empty string. For instance, consider the following field:

where ce.value IS NOT NULL

If ce.value is a VARCHAR, it can be written as the following:

where ce.value <> ''

After making these minor edits to the aline study SQL statements, the study executes successfully using Athena instead of PostgreSQL. The output produced from both methods is identical.

Next, consider the speed of Athena compared to PostgreSQL in analyzing the aline study. As shown in the following graph, the aline study using Athena queries of the Parquet-formatted MIMIC-III dataset run 10 times faster than queries of the same data in an Amazon RDS PostgreSQL database.

Compare the costs of running the aline study in Athena to running it in PostgreSQL. RDS PostgreSQL databases bill in one-second increments for the time the database runs, while Athena queries bill per gigabyte of data each query scans. Because of this fundamental pricing difference, you must make some assumptions to compare costs.

You could assume that running the aline study in RDS PostgreSQL involves the following steps, taking up an eight-hour working day:

  1. Deploy a new RDS PostgreSQL database.
  2. Load the MIMIC-III dataset.
  3. Connect a Jupyter notebook.
  4. Run the aline study.
  5. Terminate the RDS database.

You’d then compare the cost of running an RDS PostgreSQL database for eight hours (at $0.37 per hour, db.m5.xlarge with 100-GB EBS storage) to the cost of running the aline study against a dataset using Athena (9.93 GB of data scanned at $0.005/GB). As the following graph shows, this results in an almost 60x cost savings.

Working with MIMIC-III in AWS

MIMIC-III is a publicly available dataset containing detailed information about the clinical care of patients. For this reason, the MIMIC team requires that you complete a training course and submit a formal request before gaining access. For more information, see Requesting access.

After you obtain access to the MIMIC-III dataset, log in to the PhysioNet website and, under your profile settings, provide your AWS account ID. Then click the request access link, under Files, on the MIMIC-III Clinical Database page.  You then have access to the MIMIC-III dataset in AWS.

Choose Launch Stack below to begin working with the MIMIC-III dataset in your AWS account. This launches an AWS CloudFormation template, which deploys an index of the MIMIC-III Parquet-formatted dataset into your AWS Glue Data Catalog. It also deploys an Amazon SageMaker notebook instance pre-loaded with the aline study code and many other MIMIC-provided analytic examples.

After the AWS CloudFormation template deploys, choose Outputs, and follow the link to access Jupyter Notebooks. From there, you can open and execute the aline study code by browsing the filesystem to the path ./mimic-code/notebooks/aline-aws/aline-awsathena.ipynb.

After you’ve completed your analysis, you can stop your Jupyter Notebook instance to suspend charges for compute.  Any time that you want to continue working, just start the instance and continue where you left off.

Conclusion

The RODA program provides quick and inexpensive access to the global biomedical research resources of the MIMIC-III dataset. Cloud-native analytic tools like AWS Glue and Athena accelerate research, and groups like the MIT Laboratory of Computational Physiology are pioneering data availability in modern data formats like Apache Parquet.

The analytic approaches and code demonstrated in this post can be applied generally to help increase agility and decrease cost. Consider using them as you enhance existing or perform new biomedical informatics research.

 


About the Author


James Wiggins is a senior healthcare solutions architect at AWS. He is passionate about using technology to help organizations positively impact world health. He also loves spending time with his wife and three children.

 

 

 


Alistair Johnson is a research scientist and the Massachusetts Institute of Technology. Alistair has extensive experience and expertise in working with ICU data, having published the MIMIC-III and eICU-CRD datasets. His current research focuses on clinical epidemiology and machine learning for decision support.

 

 

AWS Lake Formation – Now Generally Available

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-lake-formation-now-generally-available/

As soon as companies started to have data in digital format, it was possible for them to build a data warehouse, collecting data from their operational systems, such as Customer relationship management (CRM) and Enterprise resource planning (ERP) systems, and use this information to support their business decisions.

The reduction in costs of storage, together with an even greater reduction in complexity for managing large quantities of data, made possible by services such as Amazon S3, has allowed companies to retain more information, including raw data that is not structured, such as logs, images, video, and scanned documents.

This is the idea of a data lake: to store all your data in one, centralized repository, at any scale. We are seeing this approach with customers like Netflix, Zillow, NASDAQ, Yelp, iRobot, FINRA, and Lyft. They can run their analytics on this larger dataset, from simple aggregations to complex machine learning algorithms, to better discover patterns in their data and understand their business.

Last year at re:Invent we introduced in preview AWS Lake Formation, a service that makes it easy to ingest, clean, catalog, transform, and secure your data and make it available for analytics and machine learning. I am happy to share that Lake Formation is generally available today!

With Lake Formation you have a central console to manage your data lake, for example to configure the jobs that move data from multiple sources, such as databases and logs, to your data lake. Having such a large and diversified amount of data makes configuring the right access permission also critical. You can secure access to metadata in the Glue Data Catalog and data stored in S3 using a single set of granular data access policies defined in Lake Formation. These policies allow you to define table and column-level data access.

One thing I like the most of Lake Formation is that it works with your data already in S3! You can easily register your existing data with Lake Formation, and you don’t need to change existing processes loading your data to S3. Since data remains in your account, you have full control.

You can also use Glue ML Transforms to easily deduplicate your data. Deduplication is important to reduce the amount of storage you need, but also to make analyzing your data more efficient because you don’t have neither the overhead nor the possible confusion of looking at the same data twice. This problem is trivial if duplicate records can be identified by a unique key, but becomes very challenging when you have to do a “fuzzy match”. A similar approach can be used for record linkage, that is when you are looking for similar items in different tables, for example to do a “fuzzy join” of two databases that do not share a unique key.

In this way, implementing a data lake from scratch is much faster, and managing a data lake is much easier, making these technologies available to more customers.

Creating a Data Lake
Let’s build a data lake using the Lake Formation console. First I register the S3 buckets that are going to be part of my data lake. Then I create a database and grant permission to the IAM users and roles that I am going to use to manage my data lake. The database is registered in the Glue Data Catalog and holds the metadata required to analyze the raw data, such as the structure of the tables that are going to be automatically generated during data ingestion.

Managing permissions is one of the most complex tasks for a data lake. Consider for example the huge amount of data that can be part of it, the sensitive, mission-critical nature of some of the data, and the different structured, semi-structured, and unstructured formats in which data can reside. Lake Formation makes it easier with a central location where you can give IAM users, roles, groups, and Active Directory users (via federation) access to databases, tables, optionally allowing or denying access to specific columns within a table.

To simplify data ingestion, I can use blueprints that create the necessary workflows, crawlers and jobs on AWS Glue for common use cases. Workflows enable orchestration of your data loading workloads by building dependencies between Glue entities, such as triggers, crawlers and jobs, and allow you to track visually the status of the different nodes in the workflows on the console, making it easier to monitor progress and troubleshoot issues.

Database blueprints help load data from operational databases. For example, if you have an e-commerce website, you can ingest all your orders in your data lake. You can load a full snapshot from an existing database, or incrementally load new data. In case of an incremental load, you can select a table and one or more of its columns as bookmark keys (for example, a timestamp in your orders) to determine previously imported data.

Log file blueprints simplify ingesting logging formats used by Application Load Balancers, Elastic Load Balancers, and AWS CloudTrail. Let’s see how that works more in depth.

Security is always a top priority, and I want to be able to have a forensic log of all management operations across my account, so I choose the CloudTrail blueprint. As source, I select a trail collecting my CloudTrail logs from all regions into an S3 bucket. In this way, I’ll be able to query account activity across all my AWS infrastructure. This works similarly for a larger organization having multiple AWS accounts: they just need, when configuring the trail in the CloudTrial console, to apply the trail to their whole organization.

I then select the target database, and the S3 location for the data lake. As data format I use Parquet, a columnar storage format that will make querying the data faster and cheaper. The import frequency can be hourly to monthly, with the option to choose the day of the week and the time. For now, I want to run the workflow on demand. I can do that from the console or programmatically, for example using any AWS SDK or the AWS Command Line Interface (CLI).

Finally, I give the workflow a name, the IAM role to use during execution, and a prefix for the tables that will be automatically created by this workflow.

I start the workflow from the Lake Formation console and select to view the workflow graph. This opens the AWS Glue console, where I can visually see the steps of the workflow and monitor the progress of this run.

When the workflow is completed a new table is available in my data lake database. The source data remain as logs in the S3 bucket output of CloudTrail, but now I have them consolidated, in Parquet format and partitioned by date, in my data lake S3 location. To optimize costs, I can set up an S3 lifecycle policy that automatically expires data in the source S3 bucket after a safe amount of time has passed.

Securing Access to the Data Lake
Lake Formation provides secure and granular access to data stores in the data lake, via a new grant/revoke permissions model that augments IAM policies. It is simple to set up these permissions, for example using the console:

I simply select the IAM user or role I want to grant access to. Then I select the database and optionally the tables and the columns I want to provide access to. It is also possible to select which type of access to provide. For this demo, simple select permissions are sufficient.

Accessing the Data Lake
Now I can query the data using tools like Amazon Athena or Amazon Redshift. For example, I open the query editor in the Athena console. First, I want to use my new data lake to look into which source IP addresses are most common in my AWS Account activity:

SELECT sourceipaddress, count(*)
FROM my_trail_cloudtrail
GROUP BY  sourceipaddress
ORDER BY  2 DESC;

Looking at the result of the query, you can see which are the AWS API endpoints that I use the most. Then, I’d like to check which user identity types are used. That is an information stored in JSON format inside one of the columns. I can use some of the JSON functions available with Amazon Athena to get that information in my SQL statements:

SELECT json_extract_scalar(useridentity, '$.type'), count(*)
FROM "mylake"."my_trail_cloudtrail"
GROUP BY  json_extract_scalar(useridentity, '$.type')
ORDER BY  2 DESC;

Most of the times, AWS services are the ones creating activities in my trail. These queries are just an example, but give me quickly a deeper insight in what is happening in my AWS account.

Think of what could be a similar impact for your business! Using database and logs blueprints, you can quickly create workflows to ingest data from multiple sources within your organization, set the right permission at column level of who can have access to any information collected, clean and prepare your data using machine learning transforms, and correlate and visualize the information using tools like Amazon Athena, Amazon Redshift, and Amazon QuickSight.

Customizing Data Access with Column-Level Permissions
In order to follow data privacy guidelines and compliance, the mission-critical data stored in a data lake requires to create custom views for different stakeholders inside the company. Let’s compare the visibility of two IAM users in my AWS account, one that has full permissions on a table, and one that has only select access to a subset of the columns of the same table.

I already have a user with full access to the table containing my CloudTrail data, it’s called danilop. I create a new limitedview IAM user and I give it access to the Athena console. In the Lake Formation console, I only give this new user select permissions on three of the columns.

To verify the different access to the data in the table, I log in with one user at a time and go to the Athena console. On the left I can explore which tables and columns the logged-in user can see in the Glue Data Catalog. Here’s a comparison for the two users, side-by-side:

The limited user has access only to the three columns that I explicitly configured, and to the four columns used for partitioning the table, whose access is required to see any data. When I query the table in the Athena console with a select * SQL statement, logged in as the limitedview user, I only see data from those seven columns:

Available Now
There is no additional cost in using AWS Lake Formation, you pay for the use of the underlying services such as Amazon S3 and AWS Glue. One of the core benefits of Lake Formation are the security policies it is introducing. Previously you had to use separate policies to secure data and metadata access, and these policies only allowed table-level access. Now you can give access to each user, from a central location, only to the the columns they need to use.

AWS Lake Formation is now available in US East (N. Virginia), US East (Ohio), US West (Oregon), Europe (Ireland), and Asia Pacific (Tokyo). Redshift integration with Lake Formation requires Redshift cluster version 1.0.8610 or higher, your clusters should have been automatically updated by the time you read this. Support for Apache Spark with Amazon EMR will follow over the next few months.

I only scratched the surface of what you can do with Lake Formation. Building and managing a data lake for your business is now much easier, let me know how you are using these new capabilities!

Danilo

Analyzing AWS WAF logs with Amazon ES, Amazon Athena, and Amazon QuickSight

Post Syndicated from Aaron Franco original https://aws.amazon.com/blogs/big-data/analyzing-aws-waf-logs-with-amazon-es-amazon-athena-and-amazon-quicksight/

AWS WAF now includes the ability to log all web requests inspected by the service. AWS WAF can store these logs in an Amazon S3 bucket in the same Region, but most customers deploy AWS WAF across multiple Regions—wherever they also deploy applications. When analyzing web application security, organizations need the ability to gain a holistic view across all their deployed AWS WAF Regions.

This post presents a simple approach to aggregating AWS WAF logs into a central data lake repository, which lets teams better analyze and understand their organization’s security posture. I walk through the steps to aggregate regional AWS WAF logs into a dedicated S3 bucket. I follow that up by demonstrating how you can use Amazon ES to visualize the log data. I also present an option to offload and process historical data using AWS Glue ETL. With the data collected in one place, I finally show you how you can use Amazon Athena and Amazon QuickSight to query historical data and extract business insights.

Architecture overview

The case I highlight in this post is the forensic use of the AWS WAF access logs to identify distributed denial of service (DDoS) attacks by a client IP address. This solution provides your security teams with a view of all incoming requests hitting every AWS WAF in your infrastructure.

I investigate what the IP access patterns look like over time and assess which IP addresses access the site multiple times in a short period of time. This pattern suggests that the IP address could be an attacker. With this solution, you can identify DDoS attackers for a single application, and detect DDoS patterns across your entire global IT infrastructure.

Walkthrough

This solution requires separate tasks for architecture setup, which allows you to begin receiving log files in a centralized repository, and analytics, which processes your log data into useful results.

Prerequisites

To follow along, you must have the following resources:

  • Two AWS accounts. Following AWS multi-account best practices, create two accounts:
    • A logging account
    • A resource account that hosts the web applications using AWS WAFFor more information about multi-account setup, see AWS Landing Zone. Using multiple accounts isolates your logs from your resource environments. This helps maintain the integrity of your log files and provides a central access point for auditing all application, network, and security logs.
  • The ability to launch new resources into your account. The resources might not be eligible for Free Tier usage and so might incur costs.
  • An application running with an Application Load Balancer, preferably in multiple Regions. If you do not already have one, you can launch any AWS web application reference architecture to test and implement this solution.

For this walkthrough, you can launch an Amazon ECS example from the ecs-refarch-cloudformation GitHub repo. This is a “one click to deploy” example that automatically sets up a web application with an Application Load Balancer. Launch this in two different Regions to simulate a global infrastructure. You ultimately set up a centralized bucket that both Regions log into, which your forensic analysis tools then draw from. Choose Launch Stack to launch the sample application in your Region of choice.

Setup

Architecture setup allows you to begin receiving log files in a centralized repository.

Step 1: Provide permissions

Begin this process by providing appropriate permissions for one account to access resources in another. Your resource account needs cross-account permission to access the bucket in the logging account.

  1. Create your central logging S3 bucket in the logging account and attach the following bucket policy to it under the Permissions Make a note of the bucket’s ARN. You need this information for future steps.
  2. Change RESOURCE-ACCOUNT-ID and CENTRAL-LOGGING-BUCKET-ARNto the correct values based on the actual values in your accounts:
     // JSON Document
     {
       "Version": "2012-10-17",
       "Statement": [
          {
             "Sid": "Cross Account AWS WAF Account 1",
             "Effect": "Allow",
             "Principal": {
                "AWS": "arn:aws:iam::RESOURCE-ACCOUNT-ID:root"
             },
             "Action": [
                "s3:GetObject",
                "s3:PutObject"
             ],
             "Resource": [
                "CENTRAL-LOGGING-BUCKET-ARN/*"
             ]
          }
       ]
    }

Step 2: Manage Lambda permissions

Next, the Lambda function that you create in your resource account needs permissions to access the S3 bucket in your central logging account so it can write files to that location. You already provided basic cross-account access in the previous step, but Lambda still needs the granular permissions at the resources level. Remember to grant these permissions in both Regions where you launched the application that you intend to monitor with AWS WAF.

  1. Log in to your resource account.
  2. To create an IAM role for the Lambda function, in the Lambda console, choose Policies, Create Policy.
  3. Choose JSON, and enter the following policy document. Replace YOUR-SOURCE-BUCKETand YOUR-DESTINATION-BUCKET with the relative ARNs of the buckets that you are using for this walkthrough.
    // JSON document
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "ListSourceAndDestinationBuckets",
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:ListBucketVersions"
                ],
                "Resource": [
                    "YOUR-SOURCE-BUCKET",
                    "YOUR-DESTINATION-BUCKET"
                ]
            },
            {
                "Sid": "SourceBucketGetObjectAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:GetObjectVersion"
                ],
                "Resource": "YOUR-SOURCE-BUCKET/*"
            },
            {
                "Sid": "DestinationBucketPutObjectAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject"
                ],
                "Resource": "YOUR-DESTINATION-BUCKET/*"
            }
        ]
     }

  4. Choose Review policy, enter your policy name, and save it.
  5. With the policy created, create a new role for your Lambda function and attach the custom policy to that role. To do this, navigate back to the IAM dashboard.
  6. Select Create roleand choose Lambda as the service that uses the role. Select the custom policy that you created earlier in this step and choose Next. You can add tags if required and then name and create this new role.
  7. You must also add S3 as a trusted entity in the Trust Relationship section of the role. Choose Edit trust relationship and add amazonaws.com to the policy, as shown in the following example.

Lambda and S3 now appear as trusted entities under the Trust relationships tab, as shown in the following screenshot.

Step 3: Create a Lambda function and copy log files

Create a Lambda function in the same Region as your resource account’s S3 bucket. This function reads log files from the resource account bucket and then copies that content to the logging account’s bucket. Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF.

  1. Log in to your resource account.
  2. Navigate to Lambda in your console and choose Create Function.
  3. Choose the Author from scratch function and name it. Choose the IAM role you created in the previous step and attach it to the Lambda function.
  4. Choose Create function.
  5. This Lambda function receives a document from S3 that contains nested JSON string data. To handle this data, you must extract the JSON from this string to retrieve key names of both the document and the bucket. Your function then uses this information to copy the data to your central logging account bucket in the next step. To create this function, Copy and paste this code into the Lambda function that you created. Replace the bucket names with the names of the buckets that you created earlier. After you decide on a partitioning strategy, modify this script later.
    // Load the AWS SDK
    const aws = require('aws-sdk');
    
    // Construct the AWS S3 Object 
    const s3 = new aws.S3();
    
    //Main function
    exports.handler = (event, context, callback) => {
        console.log("Got WAF Item Event")
        var _srcBucket = event.Records[0].s3.bucket.name;
        let _key = event.Records[0].s3.object.key;
        let _keySplit = _key.split("/")
        let _objName = _keySplit[ (_keySplit.length - 1) ];
        let _destPath = _keySplit[0]+"/"+_keySplit[1]+"/YOUR-DESTINATION-BUCKET/"+_objName;
        let _sourcePath = _srcBucket + "/" + _key;
        console.log(_destPath)
        let params = { Bucket: destBucket, ACL: "bucket-owner-full-control", CopySource: _sourcePath, Key: _destPath };
        s3.copyObject(params, function(err, data) {
            if (err) {
                console.log(err, err.stack);
            } else {
                console.log("SUCCESS!");
            }
        });
        callback(null, 'All done!');
    };

Step 4: Set S3 to Lambda event triggers

This step sets up event triggers in your resource account’s S3 buckets. These triggers send the file name and location logged by AWS WAF logs to the Lambda function. The triggers also notify the Lambda function that it must move the newly arrived file into your central logging bucket. Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF.

  1. Go to the S3 dashboard and choose your S3 bucket, then choose the Properties Under Advanced settings, choose Events.
  2. Give your event a name and select PUT from the Events check boxes.
  3. Choose Lambda from the Send To option and select your Lambda function as the destination for the event.

Step 5: Add AWS WAF to the Application Load Balancer

Add an AWS WAF to the Application Load Balancer so that you can start logging events. You can optionally delete the original log file after Lambda copies it. This reduces costs, but your business and security needs might err on the side of retaining that data.

Create a separate prefix for each Region in your central logging account bucket waf-central-logs so that AWS Glue can properly partition them. For best practices of partitioning with AWS Glue, see Working with partitioned data in AWS Glue. AWS Glue ingests your data and stores it in a columnar format optimized for querying in Amazon Athena. This helps you visualize the data and investigate the potential attacks.

Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF. The procedure assumes that you already have an AWS WAF enabled that you can use for this exercise. To move forward with the next step, you need AWS WAF enabled and connected to Amazon Kinesis Data Firehose for log delivery.

Setting up and configuring AWS WAF

If you don’t already have a web ACL in place, set up and configure AWS WAF at this point. This solution handles logging data from multiple AWS WAF logs in multiple Regions from more than one account.

To do this efficiently, you should consider your partitioning strategy for the data. You can grant your security teams a comprehensive view of the network. Create each partition based on the Kinesis Data Firehose delivery stream for the specific AWS WAF associated with the Application Load Balancer. This partitioning strategy also allows the security team to view the logs by Region and by account. As a result, your S3 bucket name and prefix look similar to the following example:

s3://central-waf-logs/<account_id>/<region_name>/<kinesis_firehose_name>/...filename...

Step 6: Copying logs with Lambda code

This step updates the Lambda function to start copying log files. Keep your partitioning strategy in mind as you update the Lambda function. Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF.

To accommodate the partitioning, modify your Lambda code to match the examples in the GitHub repo.

Replace <kinesis_firehose_name> in the example code with the name of the Kinesis Data Firehose delivery stream attached to the AWS WAF. Replace <central logging bucket name> with the S3 bucket name from your central logging account.

Kinesis Data Firehose should now begin writing files to your central S3 logging bucket with the correct partitioning. To generate logs, access your web application.

Analytics

Now that Kinesis Data Firehose can write collected files into your logging account’s S3 bucket, create an Elasticsearch cluster in your logging account in the same Region as the central logging bucket. You also must create a Lambda function to handle S3 events as the central logging bucket receives new log files. This creates a connection between your central log files and your search engine. Amazon ES gives you the ability to query your logs quickly to look for potential security threats. The Lambda function loads the data into your Amazon ES cluster. Amazon ES also includes a tool named Kibana, which helps with managing data and creating visualizations.

Step 7: Create an Elasticsearch cluster

  1. In your central Logging Account, navigate to the Elasticsearch Service in the AWS Console.
  2. Select Create Cluster, enter a domain name for your cluster, and choose version 3 from the Elasticsearch version dropdown. Choose Next.In this example, don’t implement any security policies for your cluster and only use one instance. For any real-world production tasks, keep your Elasticsearch Cluster inside your VPC.
  3. For network configuration, choose Public access and choose Next.
  4. For the access policy, and this tutorial, only allow access to the domain from a specified Account ID or ARN address. In this case, use your Account ID to gain access.
  5. Choose Next and on the final screen and confirm. You generally want to create strict access policies for your domain and not allow public access. This example only uses these settings to quickly demonstrate the capabilities of AWS services. I would never recommend this in a production environment.

AWS takes a few minutes to finish and activate your Amazon ES. Once it goes live, you can see two endpoints. The Endpoint URL is the URL you use to send data to the cluster.

Step 8: Create a Lambda function to copy log files

Add an event trigger to your central logs bucket. This trigger tells your Lambda function to write the data from the log file to Amazon ES. Before you create the S3 trigger, create a Lambda function in your logging account to handle the events.

For this Lambda function, we use code from the aws-samples GitHub repository that streams data from an S3 file line by line into Amazon ES. This example uses code taken from amazon-elasticsearch-lambda-samples. Name your new Lambda function myS3toES.

  1. Copy and paste the following code into a text file named js:
    exports.handler = (event, context, callback) => {
        // get the source bucket name
        var _srcBucket = event.Records[0].s3.bucket.name;
            // get the object key of the file that landed on S3
        let _key = event.Records[0].s3.object.key;
        
        // split the key by "/"
        let _keySplit = _key.split("/")
            // get the object name
        let _objName = _keySplit[ (_keySplit.length - 1) ];
            // reset the destination path
        let _destPath = _keySplit[0]+"/"+_keySplit[1]+"/<kinesis_firehose_name>/"+_objName;
            // setup the source path
        let _sourcePath = _srcBucket + "/" + _key;
            // build the params for the copyObject request to S3
        let params = { Bucket: destBucket, ACL: "bucket-owner-full-control", CopySource: _sourcePath, Key: _destPath };
            // execute the copyObject request
        s3.copyObject(params, function(err, data) {
            if (err) {
                console.log(err, err.stack);
            } else {
                console.log("SUCCESS!");
            }
        });
        callback(null, 'All done!');
    };

  2. Copy and paste this code into a text file and name it json:
    //JSON Document
    {
      "name": "s3toesfunction",
      "version": "1.0.0",
      "description": "",
      "main": "index.js",
      "scripts": {},
      "author": "",
      "dependencies": {
        "byline": "^5.0.0",
        "clf-parser": "0.0.2",
        "path": "^0.12.7",    "stream": "0.0.2"
      }
    }

  3. Execute the following command in the folder containing these files:> npm install
  4. After the installation completes, create a .zip file that includes the js file and the node_modules folder.
  5. Log in to your logging account.
  6. Upload your .zip file to the Lambda function. For Code entry type, choose Upload a .zip file.
  7. This Lambda function needs an appropriate service role with a trust relationship to S3. Choose Edit trust relationships and add amazonaws.com and lambda.amazonaws.com as trusted entities.
  8. Set up your IAM role with the following permissions: S3 Read Only permissions and Lambda Basic Execution. To grant the role the appropriate access, assign it to the Lambda function from the Lambda Execution Role section in the console.
  9. Set Environment variables for your Lambda function so it knows where to send the data. Add an endpoint and use the endpoint URL you created in Step 7. Add an index and enter your index name. Add a value for region and detail the Region where you deployed your application.

Step 9: Create an S3 trigger

After creating the Lambda function, create the event triggers on your S3 bucket to execute that function. This completes your log delivery pipeline to Amazon ES. This is a common pipeline architecture for streaming data from S3 into Amazon S3.

  1. Log in to your central logging account.
  2. Navigate to the S3 console, select your bucket, then open the Properties pane and scroll down to Events.
  3. Choose Add notification and name your new event s3toLambdaToEs.
  4. Under Events, select the check box for PUT. Leave Prefix and Suffix
  5. Under Send to, select Lambda Function, and enter the name of the Lambda function that you created in the previous step—in this example, myS3toES.
  6. Choose Save.

With this complete, Lambda should start sending data to your Elasticsearch index whenever you access your web application.

Step 10: Configure Amazon ES

Your pipeline now automatically adds data to your Elasticsearch cluster. Next, use Kibana to visualize the AWS WAF logs in the central logging account’s S3 bucket. This is the final step in assembling your forensic investigation architecture.

Kibana provides tools to create visualizations and dashboards that help your security teams view log data. Using the log data, you can filter by IP address to see how many times an IP address has hit your firewall each month. This helps you track usage anomalies and isolate potentially malicious IP addresses. You can use this information to add web ACL rules to your firewall that adds extra protection against those IP addresses.

Kibana produces visualizations like the following screenshot.

In addition to the Number of IPs over Time visualization, you can also correlate the IP address to its country of origin. Correlation provides even more precise filtering for potential web ACL rules to protect against attackers. The visualization for that data looks like the following image.

Elasticsearch setup

To set up and visualize your AWS WAF data, follow this How to analyze AWS WAF logs using Amazon Elasticsearch Service post. With this solution, you can investigate your global dataset instead of isolated Regions.

An alternative to Amazon ES

Amazon ES is an excellent tool for forensic work because it provides high-performance search capability for large datasets. However, Amazon ES requires cluster management and complex capacity planning for future growth. To get top-notch performance from Amazon ES, you must adequately scale it. With the more straightforward data of these investigations, you could instead work with more traditional SQL queries.

Forensic data grows quickly, so using a relational database means you might quickly outgrow your capacity. Instead, take advantage of AWS serverless technologies like AWS Glue, Athena, and Amazon QuickSight. These technologies enable forensic analysis without the operational overhead you would experience with Elasticsearch or a relational database. To learn more about this option, consult posts like How to extract, transform, and load data from analytic processing using AWS Glue and Work with partitioned data in AWS Glue.

Athena query

With your forensic tools now in place, you can use Athena to query your data and analyze the results. This lets you refine the data for your Kibana visualizations, or directly load it into Amazon QuickSight for additional visualization. Use the Athena console to experiment until you have the best query for your visual needs. Having the database in your AWS Glue Catalog means you can make ad hoc queries in Athena to inspect your data.

In the Athena console, create a new Query tab and enter the following query:

# SQL Query
SELECT date_format(from_unixtime("timestamp"/1000), '%Y-%m-%d %h:%i:%s') as event_date, client_ip, country, account_id, waf_name, region FROM "paritionedlogdata"."waf_logs_transformed" where year='2018' and month='12';

Replace <your-database-name> and <your-table-name> with the appropriate values for your environment. This query converts the numerical timestamp to an actual date format using the SQL according to Presto 0.176 documentation. It should return the following results.

You can see which IP addresses hit your environment the most over any period of time. In a production environment, you would run an ETL job to re-partition this data and transform it into a columnar format optimized for queries. If you would like more information about doing that, see the Best Practices When Using Athena with AWS Glue post.

Amazon QuickSight visualization

Now that you can query your data in Athena, you can visualize the results using Amazon QuickSight. First, grant Amazon QuickSight access to the S3 bucket where your Athena query results live.

  1. In the Amazon QuickSight console, log in.
  2. Choose Admin/username, Manage QuickSight.
  3. Choose Account settings, Security & permissions.
  4. Under QuickSight access to AWS services, choose Add or remove.
  5. Choose Amazon S3, then choose Select S3 buckets.
  6. Choose the output bucket for your central AWS WAF logs. Also, choose your Athena query results bucket. The query results bucket begins with aws-athena-query-results-*.

Amazon QuickSight can now access the data sources. To set up your visualizations, follow these steps:

  1. In the QuickSight console, choose Manage data, New data set.
  2. For Source, choose Athena.
  3. Give your new dataset a name and choose Validate connection.
  4. After you validate the connection, choose Create data source.
  5. Select Use custom SQL and give your SQL query a name.
  6. Input the same query that you used earlier in Athena, and choose Confirm query.
  7. Choose Import to SPICE for quicker analytics, Visualize.

Allow Amazon QuickSight several minutes. It alerts you after completing the import.

Now that you have imported your data into your analysis, you can apply a visualization:

  1. In Amazon QuickSight, New analysis.
  2. Select the last dataset that you created earlier and choose Create analysis.
  3. At the bottom left of the screen, choose Line Chart.
  4. Drag and drop event_date to the X-Axis
  5. Drag and drop client_ip to the ValueThis should create a visualization similar to the following image.
  6. Choose the right arrow at the top left of the visualization and choose Hide “other” categories.This should modify your visualization to look like the following image.

You can also map the countries from which the requests originate, allowing you to track global access anomalies. You can do this in QuickSight by selecting the “Points on map” visualization type and choosing the country as the data point to visualize.

You can also add a count of IP addresses to see if you have any unusual access patterns originating from specific IP addresses.

Conclusion

Although Amazon ES and Amazon QuickSight offer similar final results, there are trade-offs to the technical approaches that I highlighted. If your use case requires the analysis of data in real time, then Amazon ES is more suitable for your needs. If you prefer a serverless approach that doesn’t require capacity planning or cluster management, then the solution with AWS Glue, Athena, and Amazon QuickSight is more suitable.

In this post, I described an easy way to build operational dashboards that track key metrics over time. Doing this with AWS Glue, Athena, and Amazon QuickSight relieves the heavy lifting of managing servers and infrastructure. To monitor metrics in real time instead, the Amazon ES solution provides a way to do this with little operational overhead. The key here is the adaptability of the solution: putting different services together can provide different solutions to your problems to fit your exact needs.

For more information and use cases, see the following resources:

Hopefully, you have found this post informative and the proposed solutions intriguing. As always, AWS welcomes all feedback or comment.

 


About the Authors

Aaron Franco is a solutions architect at Amazon Web Services .

 

 

 

 

 

 

 

 

Query your data created on-premises using Amazon Athena and AWS Storage Gateway

Post Syndicated from James Forrester original https://aws.amazon.com/blogs/big-data/query-your-data-created-on-premises-using-amazon-athena-and-aws-storage-gateway/

Enterprise customers have to maintain, protect, and provide access to the petabytes of data they produce in their data centers every day. Traditionally, this involves a set of complex, interrelated systems to store the raw data on Network Attached Storage (NAS), Storage Area Networks (SAN), or Direct Attached Storage (DAS), and to transform it and to load it into relational databases to support querying and analysis activities. This is commonly known as Extract Transform and Load or ETL.

Each of these systems must be separately maintained, often by separate teams: DBAs for the databases, systems engineers for the underlying physical infrastructure, and others. At AWS, we’re constantly looking at ways to invent and simplify on behalf of our customers. This post looks at using a combination of AWS technology that can be deployed in customers’ data centers (AWS Storage Gateway) and serverless, cloud-native technology (Amazon Athena) to simplify the process of querying critical data generated on-premises.

Customers using popular enterprise analysis tools, such as Tableau, to analyze their data rely on ODBC or JDBC to connect to and run queries against their data. Conversely, file systems use protocols like SMB or NFS to read and write files. Until now, it’s often been necessary to translate data from its raw format (often text files) into a relational database in order to allow analysis on it. Enter: AWS Storage Gateway and Amazon Athena.

In this blog post, I use this architecture to demonstrate the combined capabilities of Storage Gateway and Athena. AWS Storage Gateway is a hybrid storage service that enables your on-premises applications to seamlessly use AWS cloud storage. The File Gateway configuration of the AWS Storage Gateway offers you a seamless way to connect to the cloud in order to store application data files and backup images as durable objects on Amazon S3 cloud storage. File gateway offers SMB or NFS-based access to data in Amazon S3 with local caching, and files are stored and billed as S3 objects. Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Let’s walk through an example with ACME Corp. ACME is a fictitious, but representative enterprise that wants to store, protect, and analyze the data it receives from millions of IoT sensors around the world.

The figure below gives a high-level view of how data flows between each step in ACME’s workflow in the proposed solution. Once this solution is configured, the flow of data into the hands of ACME’s analysts is fully automated with no manual intervention required.

Today, ACME receives a daily file from each sensor via FTP in text (comma-separated) format. These files share a common set of columns, and the files are stored on an enterprise NAS device behind the FTP server. The NAS device is replicated to a secondary facility for disaster recovery purposes on a daily basis.

At the end of each day, an ETL process runs, reads each text file, and loads it to a relational database table with a similar column structure. ACME analysts receive an email in their in-boxes when the load process is complete, allowing them to begin their analysis of the previous day’s activities provided there were no issues with the load. In the event of a load issue, operations staff are paged, which can delay the start of the analysts’ day while the problem is resolved.

In the event of a NAS failure, the prior day’s data must be replayed into the FTP server — a costly and time-consuming process. ACME’s hypothetical Recovery Time Objective for the analysis activities in the event of a database failure is four hours; their Recovery Point Objective for the data is up to one day. Operations personnel must maintain FTP servers, the NAS environment and Database servers.

Without making changes to ACME’s FTP process, which they wish to maintain in its current state, our first step is to deploy a File Gateway on their VMware infrastructure to replace ACME’s existing NAS. Let me quickly demonstrate how you can setup File Gateway for testing purposes in your own Amazon EC2 environment.

Step 1: From the AWS Management Console, select “Storage Gateway,” then select “Create Gateway:”

Step 2: Select the “File Gateway” gateway type and hit “Next:”

Step 3: Under “Select host platform,” choose “Amazon EC2” and follow the on-screen instructions to launch a Gateway instance:

After configuring and testing the gateway, it is mounted to the FTP server in place of the existing NAS. Here’s ACME’s S3 bucket, where ACME can see the data from the IoT sensors is now appearing in Amazon S3:

Here we can see the contents of the configured S3 bucket with the object keys presented as files to the Windows machine, and hence accessible in Windows Explorer:

Here’s what the File Gateway configuration looks like in ACME’s account. We can see that the gateway we created, AthenaGateway, is up and running, up to date, and mapped to the file share storage resource:

More information on configuring a File Gateway is available here: Creating a File Gateway.

The next step is to configure Amazon Athena. Using the AWS Console, we create a new Athena database and table pointing to ACME’s S3 bucket to which File Gateway is writing, with a table definition representing the columns in the data.

ACME’s policies call for the data to be encrypted at rest, and File Gateway supports encryption via KMS when writing data to the S3 bucket. Athena supports a range of Amazon S3 encryption options, both for encrypted datasets in Amazon S3 and for encrypted query results.

These options encrypt data at rest in Amazon S3. Regardless of whether you use these options, transport layer security (TLS) encrypts objects in-transit between Athena resources and between Athena and Amazon S3. Query results stream to JDBC clients as plain text and are encrypted using TLS. We then run a test query in the Athena console to verify that data is being returned correctly. As new data is received by the File Gateway, it is automatically added to S3, and automatically included in Athena’s query scope. Now, we are going to create an Athena database using AWS Glue; this is to make ACME’s IoT device data in S3 via the File Gateway accessible for querying via Athena.

Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. AWS Glue consists of a central metadata repository known as the AWS Glue Data Catalog, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries. AWS Glue is serverless, so there’s no infrastructure to set up or manage.

First, we open the Glue home page in the AWS Management Console, then select “Add tables using a crawler,” and follow the steps described, referencing your S3 bucket and prefix when asked. Documentation on configuring a Glue crawler is here:

Once the crawler is configured, run it. It will crawl your data in S3 and flag once completed:

Next, open the Athena home page in the AWS Management Console:

In the Athena home page, you’ll now see the database and tables created by Glue. Here is Athena, configured to point to the sensor data in S3 and running a test query against it. The test query we will use is as follows:

SELECT col1, count(col1)
FROM acmesensordata
WHERE (col3 > 50
	AND col3 < 60)
GROUP BY col1
LIMIT 100

This sample query scans all ACME’s data to count the top 100 cities with sensors that have emitted values in the range between 50 and 60, and reports how many such data points have been emitted.

The final step is to redirect ACME’s Tableau environment to point to Athena’s ODBC endpoint. Tableau’s ODBC configuration is managed centrally by ACME, and the necessary details are swapped to point to Athena in place of the existing on-premise relational database.

When you start Tableau, under “Connect,” you can see the file and database types that are supported by Tableau Desktop. Select “More” to see the complete list. Tableau considers ODBC (Open Database Connectivity) as a standard way to connect to a database. You can connect Tableau to your data using the ODBC driver for Amazon Athena and the Tableau Other Databases (ODBC) connector. Tableau’s complete documentation for connecting to ODBC is available here.

Let’s recap what we’ve changed, and the technology and end-user impact.

  • We have replaced ACME’s on-premises NAS with AWS Storage Gateway backed by an S3 bucket, and configured their FTP server to use the File Gateway’s file share in place of their existing one.
  • We have configured Storage Gateway as a File Gateway to provide access to the customer S3 bucket as a NAS. Their data is now in S3.
  • We have configured a serverless Amazon Athena database to mimic the previous relational database, and exposed an ODBC endpoint to this database.
  • We have re-configured ACME’s Tableau environment to point to this ODBC endpoint. Since the relational database in this scenario was only used to service ad-hoc SQL queries, it is no longer needed.

If there are no other dependencies, ACME can now decommission the on-premises ETL, relational database, and NAS infrastructure that were dedicated to supporting this scenario. Aside from the FTP servers and the Storage Gateway Virtual Machine hosts, there are now no servers to manage that support this scenario either.

End-user analysts working with this data no longer need to wait until start of day to begin their analysis. New sensor data arrives in the Athena S3 folder shortly after FTP delivery from the sensors, and is available for query immediately. The removal of the ETL and relational database infrastructure reduces the potential points of failure in the architecture, and in the event of a disaster, an Athena endpoint in a second AWS Region (backed by S3 Cross Region Replication) makes the data available to Tableau as soon as replication completes. Because S3 has the ability to trigger events when new data arrives, analysts can now be notified when data from particular groups of sensors becomes available, allowing them to begin their work at the earliest possible moment.

Data remains cached on the local gateway, allowing for extremely rapid access by other on-premise high-performance computing, big data, or other applications. For high availability, ACME has the ability to rapidly launch a second storage gateway instance on their existing VMware infrastructure should the primary fail. A further refinement would be to use the NotifyWhenUploaded functionality in AWS Storage Gateway to provide CloudWatch Events when groups of data are uploaded to enable batch processing.

And that’s it!

Summary

For our many enterprise customers, who deal with complex architectures for these types of hybrid cloud scenarios, the combination of AWS Storage Gateway and Amazon Athena can help simplify and lower costs while enabling on-premise, cloud native and hybrid scenarios across their application portfolios.

If you have any feedback or questions, please feel free to leave a comment.

 


About the Author

James Forrester is Head of Technology for AWS Global Accounts. He works with customers around the world to provide thought leadership on the transformative value, applicability and usage of the full breadth of AWS services.

 

 

Separating queries and managing costs using Amazon Athena workgroups

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/separating-queries-and-managing-costs-using-amazon-athena-workgroups/

Amazon Athena is a serverless query engine for data on Amazon S3. Many customers use Athena to query application and service logs, schedule automated reports, and integrate with their applications, enabling new analytics-based capabilities.

Different types of users rely on Athena, including business analysts, data scientists, security, and operations engineers. But how do you separate and manage these workloads so that users get the best experience while minimizing costs?

In this post, I show you how to use workgroups to do the following:

  • Separate workloads.
  • Control user access.
  • Manage query usage and costs.

Separate workloads

By default, all Athena queries execute in the primary workgroup.  As an administrator, you can create new workgroups to separate different types of workloads.  Administrators commonly turn to workgroups to separate analysts running ad hoc queries from automated reports.  Here’s how to build out that separation.

First create two workgroups, one for ad hoc users (ad-hoc-users) and another for automated reports (reporting).

Next, select a specific output location. All queries executed inside this workgroup save their results to this output location. Routing results to a single secure location helps make sure users only access data they are permitted to see. You can also enforce encryption of query results in S3 by selecting the appropriate encryption configuration.

Workgroups also help you simplify the onboarding of new users to Athena. By selecting override client-side settings, you enforce a predefined configuration on all queries within a workgroup. Users no longer have to configure a query results output location or S3 encryption keys. These settings default to the parameters defined for the workgroup where those queries execute. Additionally, each workgroup maintains a unique query history and saved query inventory, making queries easier for you to track down.

Finally, when creating a workgroup, you can add up to 50 key-value pair tags to help identify your workgroup resources. Tags are also useful when attempting to allocate Athena costs between groups of users. Create Name and Dept tags for the ad-hoc-users and reporting workgroups with their name and department association.

Control user access to workgroups

Now that you have two workgroups defined, ad-hoc-users and reporting, you must control who can use and update them.  Remember that workgroups are IAM resources and therefore have an ARN. You can use this ARN in the IAM policy that you associate with your users.  In this example, create a single IAM user representing the team of ad hoc users and add the individual to an IAM group. The group contains a policy that enforces what actions these users can perform.

Start by reviewing IAM Policies for Accessing Workgroups and Workgroup Example Policies to familiarize yourself with policy options. Use the following IAM policy to set up permissions for your analyst user. Grant this user only the permissions required for working in the ad-hoc-users workgroup. Make sure that you tweak this policy to match your exact needs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:GetQueryExecutions",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup",
                "athena:ListTagsForResource"
            ],
            "Resource": "arn:aws:athena:us-east-1:112233445566:workgroup/ad-hoc-users"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObjectAcl",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListMultipartUploadParts"
            ],
            "Resource": "arn:aws:s3:::demo/workgroups/adhocusers/*"
        },
{
            "Effect": "Allow",
            "Action": [
                "glue:Get*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:112233445566:catalog",
                "arn:aws:glue:us-east-1:112233445566:database/amazon",
                "arn:aws:glue:us-east-1:112233445566:table/amazon/*"
            ]
        }
    ]
}

Now your analyst user can execute queries only in the ad-hoc-users workgroup. The analyst user can switch to other workgroups, but they lose access when they try to perform any action. They are further restricted to list and query only those tables that belong to the Amazon database. For more information about controlling access to AWS Glue resources such as databases and tables, see AWS Glue Resource Policies for Access Control.

The following screenshot shows what the analyst user sees in the Athena console:

I’ve created a simple Node.js tool that executes SQL queries stored as files in a given directory. You can find my Athena test runner code in the athena_test_runner GitHub repo. You can use this code to simulate a reporting tool, after configuring it to use a workgroup. To do that, create an IAM role with permissions like those previously defined for the analyst user. This time, restrict access to the reporting workgroup.

The following JavaScript code example shows how to select a workgroup programmatically when executing queries:

function executeQueries(files) {
    params = 
    {
      "QueryString": "", 
      "ResultConfiguration": { 
        "OutputLocation": ""
      },
      "QueryExecutionContext": {
        "Database": "default"
      },
      "WorkGroup":"reporting"
    }
 
    params.QueryString = "SELECT * FROM amazon.final_parquet LIMIT 10"
    return new Promise((resolve, reject) => {
        athena.startQueryExecution(params, (err, results) => {
            if (err) {
                reject(err.message)
            } else {
                resolve(results)
            }
        })
    })
}

Run sample automated reports under the reporting workgroup, with the following command:

node index.js testsuite/

Query histories remain isolated between workgroups. A user logging into the Athena console as an analyst using the ad-hoc-users workgroup doesn’t see any automated reports that you ran under the reporting workgroup.

Managing query usage and cost

You have two workgroups configured: one for ad hoc users and another for automated reports. Now, you must safeguard against bad queries. In this use case, two potential situations for query usage should be monitored and controlled:

  • Make sure that users don’t run queries that scan more data than allowed by their budget.
  • Safeguard against automated script bugs that could cause indefinite query retirement.

First, configure data usage controls for your ad-hoc-users workgroup. There are two types of data usage controls: per-query and per-workgroup.

Set the per-query control for analysts to be 1 GB. This control cancels any query run in the ad-hoc-users workgroup that tries to scan more than 1 GB.

To observe this limit in action, choose Update, return to the query editor, and run a query that would scan more than 1 GB. This query triggers the error message, “Query cancelled! : Bytes scanned limit was exceeded”. Remember that you incur charges for data the query scanned up to the point of cancellation. In this case, you incur charges for 1 GB of data.

Now, switch to your reporting workgroup. For this workload, you’re not worried about individual queries scanning too much data. However, you want to control the aggregate amount of data scanned of all queries in this workgroup.

Create a per-workload data usage control for the reporting workgroup. You can configure the maximum amount of data scanned by all queries in the workgroup during a specific period.

For the automated reporting workload, you probably have a good idea of how long the process should take and the total amount of data that queries scan during this time. You only have a few reports to run, so you can expect them to run in a few minutes, only scanning a few megabytes of data. Begin by setting up a low watermark alarm to notify you when your queries have scanned more data than you would expect in five minutes. The following example is for demo purposes only. In most cases, this period would be longer. I configured the alarm to send a notification to an Amazon SNS topic that I created.

To validate the alarm, I made a minor change to my test queries, causing them to scan more data. This change triggered the SNS alarm, shown in the following Amazon CloudWatch dashboard:

Next, create a high watermark alarm that is triggered when the queries in your reporting workgroup exceed 1 GB of data over 15 minutes. In this case, the alarm triggers an AWS Lambda function that disables the workgroup, making sure that no additional queries execute in it. This alarm protects you from incurring faulty automation code or runaway query costs.

Before creating the data usage control, create a Node.js Lambda function to disable the workgroup. Paste in the following code:

exports.handler = async (event) => {
    const AWS = require('aws-sdk')
    let athena = new AWS.Athena({region: 'us-east-1'})
 
    let msg = JSON.parse(event.Records[0].Sns.Message)
    let wgname = msg.Trigger.Dimensions.filter((i)=>i.name=='WorkGroup')[0].value
    
    athena.updateWorkGroup({WorkGroup: wgname, State: 'DISABLED'})
 
    const response = {
        statusCode: 200,
        body: JSON.stringify(`Workgroup ${wgname} has been disabled`),
    };
    return response;
}

This code grabs the workgroup name from the SNS message body and calls the UpdateWorkGroup API action with the name and the state of DISABLED. The Athena API requires the most recent version of the AWS SDK. When you create the Lambda bundle, include the latest AWS SDK version in that bundle.

Next, create a new SNS topic and a subscription. For Protocol, select AWS Lambda. Then, select the Lambda function that you created in the previous step.

In the Athena console, create the second alarm, 1 GB for 15 min., and point it to the SNS topic that you created earlier. When triggered, this SNS topic calls the Lambda function that disables the reporting workgroup. No more queries can execute in this workgroup. You see this error message in the console when a workgroup is disabled:

Athena exposes other aggregated metrics per workgroup under the AWS/Athena namespace in CloudWatch, such as the query status and the query type (DDL or DML) per workgroup. To learn more, see Monitoring Athena Queries with CloudWatch Metrics.

Cost allocation tags

When you created your ad-hoc-users and reporting workgroups, you added Name and Dept tags. These tags can be used in your Billing and Cost Management console to determine the usage per workgroup.

Summary

In this post, you learned how to use workgroups in Athena to isolate different query workloads, manage access, and define data usage controls to protect yourself from runaway queries. Metrics exposed to CloudWatch help you monitor query performance and make sure that your users are getting the best experience possible. For more details, see Using Workgroups to Control Query Access.

About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Extracting Salesforce.com data using AWS Glue and analyzing with Amazon Athena

Post Syndicated from Behram Irani original https://aws.amazon.com/blogs/big-data/extracting-salesforce-com-data-using-aws-glue-and-analyzing-with-amazon-athena/

Salesforce is a popular and widely used customer relationship management (CRM) platform. It lets you store and manage prospect and customer information—like contact info, accounts, leads, and sales opportunities—in one central location. You can derive a lot of useful information by combining the prospect information stored in Salesforce with other structured and unstructured data in your data lake.

In this post, I show you how to use AWS Glue to extract data from a Salesforce.com account object and save it to Amazon S3. You then use Amazon Athena to generate a report by joining the account object data from Salesforce.com with the orders data from a separate order management system.

Preparing your data

I signed up for a free Salesforce.com account, which comes with a handful of sample records populated with many of the Salesforce.com objects. You can use your organization’s development Salesforce.com account and pull data from multiple objects at the same time by modifying the SOQL query in your AWS Glue code. To demonstrate extracting data from these objects, only use the Account object to keep the query simple.

To demonstrate joining Salesforce.com data with data from another system using Amazon Athena, you create a sample data file showing orders coming from an order management system.

Setting up an AWS Glue job

Use the open source springml library to connect Apache Spark with Salesforce.com. The library comes with plenty of handy features that allow you to read, write, and update Salesforce.com objects using the Apache Spark framework.

You can compile the jars from the springml GitHub repo or download with dependencies from the Maven repo. Upload these JAR files to your S3 bucket and make a note of the full path for each.

force-partner-api-40.0.0.jar
force-wsc-40.0.0.jar
salesforce-wave-api-1.0.9.jar
spark-salesforce_2.11-1.1.1.jar 

In the AWS Management Console, choose AWS Glue in the Region where you want to run the service. Choose Jobs, Add Job. Follow the wizard by filling in the necessary details.

Under the Security configuration, script libraries, and job parameters (optional) section, for Dependent jars path, list the paths for the four JAR files listed previously, separated by commas.

For this job, I allocated Maximum capacity as “2.” This field defines the number of AWS Glue data processing units (DPUs) that the system can allocate when this job runs. A DPU is a relative measure of processing power that consists of four vCPUs of compute capacity and 16 GB of memory. When you specify an Apache Spark ETL job, you can allocate 2–100 DPUs. The default is 10 DPUs.

Execute the AWS Glue job to extract data from the Salesforce.com object

The following Scala code extracts a few fields from the Account object in Salesforce.com and writes them as a table to S3 in Apache Parquet file format.

import com.amazonaws.services.glue.util.GlueArgParser  
import com.amazonaws.services.glue.util.Job  
import com.amazonaws.services.glue.util.JsonOptions  
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}  
import org.apache.spark.SparkContext  
import scala.collection.JavaConverters.mapAsJavaMapConverter  
  
object SfdcExtractData {  
  def main(sysArgs: Array[String]) {  
      
    val sparkContext: SparkContext = new SparkContext()  
    val glueContext: GlueContext = new GlueContext(sparkContext)  
    val sparkSession = glueContext.getSparkSession  
      
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)  
    Job.init(args("JOB_NAME"), glueContext, args.asJava)  
      
    val soql = "select name, accountnumber, industry, type, billingaddress, sic from account"  
    val df = sparkSession.read.format("com.springml.spark.salesforce").option("soql",soql).option("username", "username").option("password","password+securitytoken").load()
     
    val datasource0 = DynamicFrame(df, glueContext).withName("datasource0").withTransformationContext("datasource0")  
        
    val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://replace-with-your-s3-bucket/sfdc-output", "partitionKeys" -> Seq("Industry"))), format = "parquet", transformationContext = "datasink1").writeDynamicFrame(datasource0)  
  
    Job.commit()  
  }  
}

This code relies on a few key components:

val df = sparkSession.read.format("com.springml.spark.salesforce").option("soql",soql).option("username", "username").option("password","password+securitytoken").load()

This code example establishes a Salesforce.com connection, submits a SOQL-compatible query for the Account object, and loads the returned records into a Spark DataFrame. Don’t forget to replace username with your Salesforce.com username and password as a combination of your password and the security token of your profile.

Best practices suggest storing and retrieving the password using AWS Secrets Manager instead of hardcoding it. For simplicity, I left it hardcoded in this example.

Keep in mind that this query is simple and returns only a handful of records. For large volumes of data, you might want to limit the results returned by your query or use other techniques like bulk query and chunking. Check the springml page to learn more about the functionality that Salesforce.com supports.

val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://replace-with-your-s3-bucket/sfdc-output", "partitionKeys" -> Seq("Industry"))), format = "parquet", transformationContext = "datasink1").writeDynamicFrame(datasource0)  

This code does all the writing to your S3 bucket. In this example, you want to aggregate data by Industry segments. Because of that, you should partition the data by the Industry field.

Also, the code writes in Parquet format. Athena charges you by the amount of data scanned per query. You can save on costs and get better performance when you partition the data, compress data, or convert it to columnar formats like Parquet.

After you run this code in AWS Glue, you can go to your S3 bucket where the sink points and find something like the following structure:

Query the data with Athena

After the code drops your Salesforce.com data into your S3 bucket with the correct partition and format, AWS Glue can crawl the dataset. It creates the appropriate schema in the AWS Glue Data Catalog. Wait for AWS Glue to create the table. Then, Athena can query the table and join with other tables in the catalog.

First, use the AWS Glue crawler to discover the Salesforce.com Account data that you previously stored in the S3 bucket. For details about how to use the crawler, see populating the AWS Glue Data Catalog.

In this example, point the crawler to the S3 output prefix where you stored your Salesforce.com Account data, and run it. The crawler creates a new catalog table before it finally stops.

The AWS Glue Data Catalog table automatically captures all the column names, types, and partition column used, and stores everything in your S3 bucket in Parquet file format. You can now query this table with Athena. A simple SELECT query on that table shows the results of scanning the data from the S3 bucket.

Now your Salesforce.com data is ready for Athena to query. For this example, join this data with the sample orders from this sample order management system in S3. After your AWS Glue crawler finishes cataloging the sample orders data, Athena can query it.

Finally, use Athena to join both tables in an aggregation query.

Conclusion

In this post, I showed a simple example for extracting any Salesforce.com object data using AWS Glue and Apache Spark, and saving it to S3. You can then catalog your S3 data in AWS Glue Data Catalog, allowing Athena to query it. With this mechanism in place, you can easily incorporate Salesforce data into your AWS based data lake.

If you have comments or feedback, please leave them below.

 


About the Author

 

Behram Irani is a Data Architect at Amazon Web Services.