Tag Archives: Amazon Athena

Trigger cross-region replication of pre-existing objects using Amazon S3 inventory, Amazon EMR, and Amazon Athena

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.

Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.

 

 

 

Easily query AWS service logs using Amazon Athena

Post Syndicated from Damon Cortesi original https://aws.amazon.com/blogs/big-data/easily-query-aws-service-logs-using-amazon-athena/

Many organizations use Amazon Simple Storage Service (Amazon S3) as a primary storage destination for a wide variety of logs including AWS service logs. One of the benefits of storing log data in Amazon S3 is that you can access it in any number of ways. One popular option is to query it using Amazon Athena, a serverless query engine for data on S3. Common use cases for querying logs are service and application troubleshooting, performance analysis, and security audits. To get the best performance and reduce query costs in Athena, we recommend following common best practices, as outlined in Top 10 Performance Tuning Tips for Amazon Athena on the AWS Big Data Blog. These best practices include converting the data to a columnar format like Apache Parquet and partitioning the resulting data in S3.

In this post, we’re open-sourcing a Python library known as Athena Glue Service Logs (AGSlogger). This library has predefined templates for parsing and optimizing the most popular log formats. The library provides a mechanism for defining schemas, managing partitions, and transforming data within an extract, transform, load (ETL) job in AWS Glue. AWS Glue is a serverless data transformation and cataloging service. You can use this library in conjunction with AWS Glue ETL jobs to enable a common framework for processing log data.

Using Python libraries with AWS Glue ETL

One of the features of AWS Glue ETL is the ability to import Python libraries into a job (as described in the documentation). We take advantage of this feature in our approach. With this capability, you first provide a link to a .zip file in Amazon S3 containing selected Python modules to AWS Glue. Then AWS Glue imports them at runtime.

We want our AWS Glue jobs to be as simple as possible while enabling the ability to easily roll out new versions of the library. To accomplish this, all of the setup, configuration, and transformation logic is contained in the library and AWS Glue simply executes the job. As new log formats are added or updated in the library, a new version of the .zip file can be deployed to S3. It’s then automatically imported by the relevant AWS Glue job. Here is an example ETL script:

from athena_glue_service_logs.job import JobRunner
 
job_run = JobRunner(service_name='s3_access')
job_run.convert_and_partition()

About the AGSlogger library

The library is available on GitHub in the athena-glue-service-logs repository. It’s designed to do an initial conversion of AWS Service logs and also perform ongoing conversion as new logs are delivered to S3. The following log types are supported:

  • Application Load Balancer
  • Classic Load Balancer
  • AWS CloudTrail
  • Amazon CloudFront
  • S3 Access
  • Amazon VPC Flow

To convert additional logs, update the service_name variable in the script, and also the different job parameters that point to your desired table names and Amazon S3 locations.

There are some limitations of the script:

  • The script has not been tested with large volumes of log data (greater than 100 GiB).
  • If you have a large number of log files, you might need to increase your Apache Spark executor settings. Edit the AWS Glue job and add the following job parameter:

key: --conf
value: spark.yarn.executor.memoryOverhead=1G

  • If you do not have any recent logs (less than 30 days old) for certain log types like S3 Access, the script may not be able to properly populate the optimized table.
  • Several CloudTrail fields such as requestParameters and responseElements are left as JSON strings – you can use Athena to extract data from this JSON at the time of query.

Before you begin

There are a few prerequisites before you get started:

  1. Create an IAM role to use with AWS Glue. For more information, see Create an IAM Role for AWS Glue in the AWS Glue documentation.
  2. Ensure that you have access to Athena from your account.
  3. We use Amazon S3 server access logs as our example for this script, so enable access logging on an Amazon S3 bucket. For more information, see How to Enable Server Access Logging in the
    S3 documentation.
  4. Download and store the Python library in an Amazon S3 bucket in the same AWS Region in which you run the AWS Glue ETL job. Download the latest release from https://github.com/awslabs/athena-glue-service-logs/releases. Then, copy the .zip file to your Amazon S3 bucket, as follows:

aws s3 cp athena_glue_converter_v5.3.0.zip s3://<bucket>/glue_scripts/

Now, you are ready to create the AWS Glue ETL job.

Create an AWS Glue ETL job using the library

For this post, we focus on Amazon S3 server access logs. (described in the documentation). By default, these logs are delivered to a single location in Amazon S3. Converting to Parquet and partitioning these logs can significantly improve query performance and decrease query costs.

If you’ve cloned the repository associated with this release, you can use a “make” command to automate the job creation. We also walk through the job creation process in the AWS Glue console. There are a few specific settings on the Job properties page we need to set.

To create the AWS Glue ETL job

  1. In the AWS Glue console, choose Jobs under ETL on the navigation pane, and then choose Add Job. Follow the job creation wizard. Ensure that “A new script to be authored by you” is selected. We provide the code for it later. Our ETL language is Python. Under advanced properties, enable the Job bookmark. Job metrics can also be useful when monitoring your job, but not required.
  2. Under Script libraries in the Python library path section, put the full path to the .zip file that you uploaded to your Amazon S3 bucket as shown previously:
    s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip

    You can adjust the DPUs if you think you need more or less processing power. For our purposes, you can leave it at 10.
  1. Specify a few different types of parameters, described in detail following:
  • The source of your Amazon S3 Server Access Logs.
  • The destination where to save the converted logs.

AWS service logs can be stored in a number of different locations, as discussed in Service Log Specifics. For storing Amazon S3 server access logs, specify the bucket and prefix matching those that you configured on the S3 bucket where you enabled access logging.

  • The names of the databases and tables that are created in the AWS Glue Data Catalog.

By default, the converted logs are partitioned by date. The script creates the necessary tables and keeps the partitions up-to-date on subsequent runs of the job. You don’t need to use AWS Glue crawlers, although they can provide similar functionality. Here are the different properties you need to configure:

KeyValue
--raw_database_namesource_logs
--raw_table_names3_access
--converted_database_nameaws_service_logs
--converted_table_names3_access
--s3_converted_targets3://<bucket>/converted/s3_access
--s3_source_location

s3://<bucket>/s3_access

 

  1. Continue with the rest of the wizard, finishing the job creation flow. The script editor opens. Replace all the code in the script editor, even the import lines, with these lines:
    from athena_glue_service_logs.job import JobRunner
     
    job_run = JobRunner(service_name='s3_access')
    job_run.convert_and_partition()

  1. Save the script and choose Run Job! When the job begins, you see log output from the job scrolling under the script.

The script you just created is saved to S3 in a standard bucket. You can also use the AWS Command Line Interface to create the AWS Glue ETL job. Copy the script preceding to S3 first and provide that as the ScriptLocation parameter.

aws glue create-job --name S3AccessLogConvertor \
--description "Convert and partition S3 Access logs" \
--role AWSGlueServiceRoleDefault \
--command Name=glueetl,ScriptLocation=s3://<bucket>/glue_scripts/s3_access_job.py \
--default-arguments '{
  "--extra-py-files":"s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip",
  "--job-bookmark-option":"job-bookmark-enable",
  "--raw_database_name":"source_logs",
  "--raw_table_name":"s3_access",
  "--converted_database_name":"aws_service_logs",
  "--converted_table_name":"s3_access",
  "--TempDir":"s3://<bucket>/tmp",
  "--s3_converted_target":"s3://<bucket>/converted/s3_access",
  "--s3_source_location":"s3://<bucket>/s3_access/"
}'

Scheduling future runs

By default, this job is configured to run on a manual basis. To run it on a regular basis, set up a new schedule trigger in AWS Glue to run the job at your desired frequency. We recommend scheduling it at hourly to make it easier to locate recent logs for your optimized queries.

On every run of the job, the script looks for the new log data and converts it to Parquet format. The script then adds any new partitions that might have been added as a result of the conversion. The script uses the AWS Glue job bookmarks to ensure that it processes newly delivered data. To find more information about bookmarks in the AWS Glue documentation, see Tracking Processed Data Using Job Bookmarks.

Querying your optimized data in Athena: examples

Now that you’ve converted your data from row-based log files to columnar-based Parquet, you can write queries against this data using Athena. After the first run of the script, the tables specified in the AWS Glue ETL job properties are created for you. Here are several sample queries to get you started.

Example 1: Most requested S3 keys

SELECT key, COUNT(*) AS count
FROM "aws_service_logs"."s3_access"
WHERE operation IN ('REST.GET.OBJECT', 'REST.COPY.OBJECT', 'REST.COPY.OBJECT_GET')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Example 2: Top IP addresses that accessed the bucket yesterday

SELECT remote_ip, COUNT(*) FROM "aws_service_logs"."s3_access"
WHERE year=date_format(current_date, '%Y') AND month=date_format(current_date, '%m') AND day=date_format(current_date + interval '-1' day, '%d')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Note the use of numbers instead of strings in the use of the GROUP BY and ORDER BY operations. This is one of the optimizations for Athena queries. For other optimizations, be sure to check out the Top 10 Performance Tuning Tips blog post.

In addition, we use the year, month, and day partition columns to limit the amount of data scanned and decrease the cost of the query.

Summary

This post introduces a new open-source library that you can use to efficiently process various types of AWS service logs using AWS Glue. The library automates the application of common best practices to allow high-performing and cost-effective querying of the data using Amazon Athena and Amazon Redshift. We hope this library comes in handy, and we’re open to pull requests. If you want to add a new log type, check out the code in the AWS Labs athena-glue-service-logs repository!

 


About the Author

Damon Cortesi is a big data architect with Amazon Web Services.

 

 

 

Enabling serverless security analytics using AWS WAF full logs, Amazon Athena, and Amazon QuickSight

Post Syndicated from Umesh Ramesh original https://aws.amazon.com/blogs/security/enabling-serverless-security-analytics-using-aws-waf-full-logs/

Traditionally, analyzing data logs required you to extract, transform, and load your data before using a number of data warehouse and business intelligence tools to derive business intelligence from that data—on top of maintaining the servers that ran behind these tools.

This blog post will show you how to analyze AWS Web Application Firewall (AWS WAF) logs and quickly build multiple dashboards, without booting up any servers. With the new AWS WAF full logs feature, you can now log all traffic inspected by AWS WAF into Amazon Simple Storage Service (Amazon S3) buckets by configuring Amazon Kinesis Data Firehose. In this walkthrough, you’ll create an Amazon Kinesis Data Firehose delivery stream to which AWS WAF full logs can be sent, and you’ll enable AWS WAF logging for a specific web ACL. Then you’ll set up an AWS Glue crawler job and an Amazon Athena table. Finally, you’ll set up Amazon QuickSight dashboards to help you visualize your web application security. You can use these same steps to build additional visualizations to draw insights from AWS WAF rules and the web traffic traversing the AWS WAF layer. Security and operations teams can monitor these dashboards directly, without needing to depend on other teams to analyze the logs.

The following architecture diagram highlights the AWS services used in the solution:

Figure 1: Architecture diagram

Figure 1: Architecture diagram

AWS WAF is a web application firewall that lets you monitor HTTP and HTTPS requests that are forwarded to an Amazon API Gateway API, to Amazon CloudFront or to an Application Load Balancer. AWS WAF also lets you control access to your content. Based on conditions that you specify—such as the IP addresses from which requests originate, or the values of query strings—API Gateway, CloudFront, or the Application Load Balancer responds to requests either with the requested content or with an HTTP 403 status code (Forbidden). You can also configure CloudFront to return a custom error page when a request is blocked.

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. With Kinesis Data Firehose, you don’t need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

AWS Glue can be used to run serverless queries against your Amazon S3 data lake. AWS Glue can catalog your S3 data, making it available for querying with Amazon Athena and Amazon Redshift Spectrum. With crawlers, your metadata stays in sync with the underlying data (more details about crawlers later in this post). Amazon Athena and Amazon Redshift Spectrum can directly query your Amazon S3 data lake by using the AWS Glue Data Catalog. With AWS Glue, you access and analyze data through one unified interface without loading it into multiple data silos.

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon QuickSight is a business analytics service you can use to build visualizations, perform one-off analysis, and get business insights from your data. It can automatically discover AWS data sources and also works with your data sources. Amazon QuickSight enables organizations to scale to hundreds of thousands of users and delivers responsive performance by using a robust in-memory engine called SPICE.

SPICE stands for Super-fast, Parallel, In-memory Calculation Engine. SPICE supports rich calculations to help you derive insights from your analysis without worrying about provisioning or managing infrastructure. Data in SPICE is persisted until it is explicitly deleted by the user. SPICE also automatically replicates data for high availability and enables Amazon QuickSight to scale to hundreds of thousands of users who can all simultaneously perform fast interactive analysis across a wide variety of AWS data sources.

Step one: Set up a new Amazon Kinesis Data Firehose delivery stream

  1. In the AWS Management Console, open the Amazon Kinesis Data Firehose service and choose the button to create a new stream.
    1. In the Delivery stream name field, enter a name for your new stream that starts with aws-waf-logs- as shown in the screenshot below. AWS WAF filters all streams starting with the keyword aws-waf-logs when it displays the delivery streams. Note the name of your stream since you’ll need it again later in the walkthrough.
    2. For Source, choose Direct PUT, since AWS WAF logs will be the source in this walkthrough.

      Figure 2: Select the delivery stream name and source

      Figure 2: Select the delivery stream name and source

  2. Next, you have the option to enable AWS Lambda if you need to transform your data before transferring it to your destination. (You can learn more about data transformation in the Amazon Kinesis Data Firehose documentation.) In this walkthrough, there are no transformations that need to be performed, so for Record transformation, choose Disabled.
    Figure 3: Select "Disabled" for record transformations

    Figure 3: Select “Disabled” for record transformations

    1. You’ll have the option to convert the JSON object to Apache Parquet or Apache ORC format for better query performance. In this example, you’ll be reading the AWS WAF logs in JSON format, so for Record format conversion, choose Disabled.

      Figure 4: Choose "Disabled" to not convert the JSON object

      Figure 4: Choose “Disabled” to not convert the JSON object

  3. On the Select destination screen, for Destination, choose Amazon S3.
    Figure 5: Choose the destination

    Figure 5: Choose the destination

    1. For the S3 destination, you can either enter the name of an existing S3 bucket or create a new S3 bucket. Note the name of the S3 bucket since you’ll need the bucket name in a later step in this walkthrough.
    2. For Source record S3 backup, choose Disabled, because the destination in this walkthrough is an S3 bucket.

      Figure 6: Enter the S3 bucket name, and select "Disabled" for the Source record S3 backup

      Figure 6: Enter the S3 bucket name, and select “Disabled” for the source record S3 backup

  4. On the next screen, leave the default conditions for Buffer size, Buffer interval, S3 compression and S3 encryption as they are. However, we recommend that you set Error logging to Enabled initially, for troubleshooting purposes.
    1. For IAM role, select Create new or choose. This opens up a new window that will prompt you to create firehose_delivery_role, as shown in the following screenshot. Choose Allow in this window to accept the role creation. This grants the Kinesis Data Firehose service access to the S3 bucket.

      Figure 7: Select "Create new or choose" for IAM Role

      Figure 7: Select “Allow” to create the IAM role “firehose_delivery_role”

  5. On the last step of configuration, review all the options you’ve chosen, and then select Create delivery stream. This will cause the delivery stream to display as “Creating” under Status. In a couple of minutes, the status will change to “Active,” as shown in the below screenshot.

    Figure 8: Review the options you selected

    Figure 8: Review the options you selected

Step two: Enable AWS WAF logging for a specific Web ACL

  1. From the AWS Management Console, open the AWS WAF service and choose Web ACLs. Open your Web ACL resource, which can either be deployed on a CloudFront distribution or on an Application Load Balancer.
    1. Choose the Web ACL for which you want to enable logging. (In the below screenshot, we’ve selected a Web ACL in the US East Region.)
    2. On the Logging tab, choose Enable Logging.

      Figure 9: Choose "Enable Logging"

      Figure 9: Choose “Enable Logging”

  2. The next page displays all the delivery streams that start with aws-waf-logs. Choose the Amazon Kinesis Data Firehose delivery stream that you created for AWS WAF logs at the start of this walkthrough. (In the screenshot below, our example stream name is “aws-waf-logs-us-east-1)
    1. You can also choose to redact certain fields that you wish to exclude from being captured in the logs. In this walkthrough, you don’t need to choose any fields to redact.
    2. Select Create.

      Figure 10: Choose your delivery stream, and select "Create"

      Figure 10: Choose your delivery stream, and select “Create”

After a couple of minutes, you’ll be able to inspect the S3 bucket that you defined in the Kinesis Data Firehose delivery stream. The log files are created in directories by year, month, day, and hour.

Step three: Set up an AWS Glue crawler job and Amazon Athena table

The purpose of a crawler within your Data Catalog is to traverse your data stores (such as S3) and extract the metadata fields of the files. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. When the crawler runs, the first classifier in your list to successfully recognize your data store is used to create a schema for your table. AWS Glue provides built-in classifiers to infer schemas from common files with formats that include JSON, CSV, and Apache Avro.

  1. In the AWS Management Console, open the AWS Glue service and choose Crawler to setup a crawler job.
  2. Choose Add crawler to launch a wizard to setup the crawler job. For Crawler name, enter a relevant name. Then select Next.

    Figure 11: Enter "Crawler name," and select "Next"

    Figure 11: Enter “Crawler name,” and select “Next”

  3. For Choose a data store, select S3 and include the path of the S3 bucket that stores your AWS WAF logs, which you made note of in step 1.3. Then choose Next.

    Figure 12: Choose a data store

    Figure 12: Choose a data store

  4. When you’re given the option to add another data store, choose No.
  5. Then, choose Create an IAM role and enter a name. The role grants access to the S3 bucket for the AWS Glue service to access the log files.

    Figure 13: Choose "Create an IAM role," and enter a name

    Figure 13: Choose “Create an IAM role,” and enter a name

  6. Next, set the frequency to Run on demand. You can also schedule the crawler to run periodically to make sure any changes in the file structure are reflected in your data catalog.

    Figure 14: Set the "Frequency" to "Run on demand"

    Figure 14: Set the “Frequency” to “Run on demand”

  7. For output, choose the database in which the Athena table is to be created and add a prefix to identify your table name easily. Select Next.

    Figure 15: Choose the database, and enter a prefix

    Figure 15: Choose the database, and enter a prefix

  8. Review all the options you’ve selected for the crawler job and complete the wizard by selecting the Finish button.
  9. Now that the crawler job parameters are set up, on the left panel of the console, choose Crawlers to select your job and then choose Run crawler. The job creates an Amazon Athena table. The duration depends on the size of the log files.

    Figure 16: Choose "Run crawler" to create an Amazon Athena table

    Figure 16: Choose “Run crawler” to create an Amazon Athena table

  10. To see the Amazon Athena table created by the AWS Glue crawler job, from the AWS Management Console, open the Amazon Athena service. You can filter by your table name prefix.
      1. To view the data, choose Preview table. This displays the table data with certain fields showing data in JSON object structure.
    Figure 17: Choose "Preview table" to view the data

    Figure 17: Choose “Preview table” to view the data

Step four: Create visualizations using Amazon QuickSight

  1. From the AWS Management Console, open Amazon QuickSight.
  2. In the Amazon QuickSight window, in the top left, choose New Analysis. Choose New Data set, and for the data source choose Athena. Enter an appropriate name for the data source name and choose Create data source.

    Figure 18: Enter the "Data source name," and choose "Create data source"

    Figure 18: Enter the “Data source name,” and choose “Create data source”

  3. Next, choose Use custom SQL to extract all the fields in the JSON object using the following SQL query:
    
        ```
        with d as (select
        waf.timestamp,
            waf.formatversion,
            waf.webaclid,
            waf.terminatingruleid,
            waf.terminatingruletype,
            waf.action,
            waf.httpsourcename,
            waf.httpsourceid,
            waf.HTTPREQUEST.clientip as clientip,
            waf.HTTPREQUEST.country as country,
            waf.HTTPREQUEST.httpMethod as httpMethod,
            map_agg(f.name,f.value) as kv
        from sampledb.jsonwaflogs_useast1 waf,
        UNNEST(waf.httprequest.headers) as t(f)
        group by 1,2,3,4,5,6,7,8,9,10,11)
        select d.timestamp,
            d.formatversion,
            d.webaclid,
            d.terminatingruleid,
            d.terminatingruletype,
            d.action,
            d.httpsourcename,
            d.httpsourceid,
            d.clientip,
            d.country,
            d.httpMethod,
            d.kv['Host'] as host,
            d.kv['User-Agent'] as UA,
            d.kv['Accept'] as Acc,
            d.kv['Accept-Language'] as AccL,
            d.kv['Accept-Encoding'] as AccE,
            d.kv['Upgrade-Insecure-Requests'] as UIR,
            d.kv['Cookie'] as Cookie,
            d.kv['X-IMForwards'] as XIMF,
            d.kv['Referer'] as Referer
        from d;
        ```        
        

  4. To extract individual fields, copy the previous SQL query and paste it in the New custom SQL box, then choose Edit/Preview data.
    Figure 19: Paste the SQL query in "New custom SQL query"

    Figure 19: Paste the SQL query in “New custom SQL query”

    1. In the Edit/Preview data view, for Data source, choose SPICE, then choose Finish.

      Figure 20: Choose "Spice" and then "Finish"

      Figure 20: Choose “Spice” and then “Finish”

  5. Back in the Amazon Quicksight console, under the Fields section, select the drop-down menu and change the data type to Date.

    Figure 21: In the Amazon Quicksight console, change the data type to "Date"

    Figure 21: In the Amazon Quicksight console, change the data type to “Date”

  6. After you see the Date column appear, enter an appropriate name for the visualizations at the top of the page, then choose Save.

    Figure 22: Enter the name for the visualizations, and choose "Save"

    Figure 22: Enter the name for the visualizations, and choose “Save”

  7. You can now create various visualization dashboards with multiple visual types by using the drag-and-drop feature. You can drag and drop combinations of fields such as Action, Client IP, Country, Httpmethod, and User Agents. You can also add filters on Date to view dashboards for a specific timeline. Here are some sample screenshots:
    Figure 23: Visualization dashboard samples

    Figure 23a: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23b: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23c: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23d: Visualization dashboard samples

Conclusion

You can enable AWS WAF logs to Amazon S3 buckets and analyze the logs while they are being streamed by configuring Amazon Kinesis Data Firehose. You can further enhance this solution by automating the streaming of data and using AWS Lambda for any data transformations based on your specific requirements. Using Amazon Athena and Amazon QuickSight makes it easy to analyze logs and build visualizations and dashboards for executive leadership teams. Using these solutions, you can go serverless and let AWS do the heavy lifting for you.

Author photo

Umesh Kumar Ramesh

Umesh is a Cloud Infrastructure Architect with Amazon Web Services. He delivers proof-of-concept projects, topical workshops, and lead implementation projects to various AWS customers. He holds a Bachelor’s degree in Computer Science & Engineering from National Institute of Technology, Jamshedpur (India). Outside of work, Umesh enjoys watching documentaries, biking, and practicing meditation.

Author photo

Muralidhar Ramarao

Muralidhar is a Data Engineer with the Amazon Payment Products Machine Learning Team. He has a Bachelor’s degree in Industrial and Production Engineering from the National Institute of Engineering, Mysore, India. Outside of work, he loves to hike. You will find him with his camera or snapping pictures with his phone, and always looking for his next travel destination.

This Is My Architecture: Mobile Cryptocurrency Mining

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/this-is-my-architecture-mobile-cryptocurrency-mining/

In North America, approximately 95% of adults over the age of 25 have a bank account. In the developing world, that number is only about 52%. Cryptocurrencies can provide a platform for millions of unbanked people in the world to achieve financial freedom on a more level financial playing field.

Electroneum, a cryptocurrency company located in England, built its cryptocurrency mobile back end on AWS and is using the power of blockchain to unlock the global digital economy for millions of people in the developing world.

Electroneum’s cryptocurrency mobile app allows Electroneum customers in developing countries to transfer ETNs (exchange-traded notes) and pay for goods using their smartphones. Listen in to the discussion between AWS Solutions Architect Toby Knight and Electroneum CTO Barry Last as they explain how the company built its solution. Electroneum’s app is a web application that uses a feedback loop between its web servers and AWS WAF (a web application firewall) to automatically block malicious actors. The system then uses Athena, with a gamified approach, to provide an additional layer of blocking to prevent DDoS attacks. Finally, Electroneum built a serverless, instant payments system using AWS API Gateway, AWS Lambda, and Amazon DynamoDB to help its customers avoid the usual delays in confirming cryptocurrency transactions.

 

Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight

Post Syndicated from Joe Flasher original https://aws.amazon.com/blogs/big-data/visualize-over-200-years-of-global-climate-data-using-amazon-athena-and-amazon-quicksight/

Climate Change continues to have a profound effect on our quality of life. As a result, the investigation into sustainability is growing. Researchers in both the public and private sector are planning for the future by studying recorded climate history and using climate forecast models.

To help explain these concepts, this post introduces the Global Historical Climatology Network Daily (GHCN-D). This registry is used by the global climate change research community.

This post also provides a step-by-step demonstration of how Amazon Web Services (AWS) services improve access to this data for climate change research. Data scientists and engineers previously had to access hundreds of nodes on high-performance computers to query this data. Now they can get the same data by using a few steps on AWS.

Background

Global climate analysis is essential for researchers to assess the implications of climate change on the Earth’s natural capital and ecosystem resources. This activity requires high-quality climate datasets, which can be challenging to work with because of their scale and complexity. To have confidence in their findings, researchers must be confident about the provenance of the climate datasets that they work with. For example, researchers may be trying to answer questions like: has the climate of a particular food producing area changed in a way that impacts food security? They must be able to easily query authoritative and curated datasets.

The National Centers for Environmental Information (NCEI) in the U.S. maintains a dataset of climate data that is based on observations from weather stations around the globe. It’s the Global Historical Climatology Network Daily (GHCN-D) — a central repository for daily weather summaries from ground-based stations. It is comprised of millions of quality-assured observations that are updated daily.

The most common parameters recorded are daily temperatures, rainfall, and snowfall. These are useful parameters for assessing risks for drought, flooding, and extreme weather.

The challenge

The NCEI makes the GHCN_D data available in CSV format through an FTP server, organized by year. Organizing the data by year means that a complete copy of the archive requires over 255 files (the first year in the archive is 1763). Traditionally, if a researcher wants to work on this dataset they must download it and work on it locally. For a researcher to be sure of using the latest data for their analysis, they must repeat this download every day.

For researchers, deriving insight from this data can be a challenge. They must be able to fully engage with the data, because that requires technical skill, computing resources, and subject matter expertise.

A new efficient approach

Through AWS’s collaboration with the NOAA Big Data Project, a daily snapshot of the GHCN_D dataset is now available on AWS. The data is publically accessible through an Amazon S3 bucket. For more information, see the Registry of Open Data on AWS.

Having the data available in this way offers several advantages:

  • The data is globally available to a community of users. Users no longer must download data to work on it. Everyone can work with the same, authoritative copy.
  • Time to insight is reduced. By taking advantage of AWS services, researchers can immediately start to perform analysis.
  • The cost of research is reduced. Researchers can switch off resources as soon as their analysis is finished.

This blog post illustrates a workflow using Amazon S3, Amazon Athena, AWS Glue, and Amazon QuickSight that demonstrates how quickly one can derive insights from this dataset.

The workflow presented in this post follows these general steps:

  • Extract data files from the NOAA bucket and make the data available as tables.
  • Use SQL to query the data contained in the tables.
  • Show how to speed up analysis by creating tables from queries and storing those tables in a private Amazon S3 bucket.
  • Visualize the data to gain insight.

Overview of the GHCN_D dataset

The GHCN-D is a quality-assured dataset that contains daily weather summaries from weather stations across global land areas. It has the following properties:

  • Data is integrated from approximately 30 sources that provide weather observations from various national and international networks.
  • A comprehensive dataset for the US and good coverage for many parts of the world.
  • There are many types of daily weather observations in this dataset, but the majority are maximum temperature, minimum temperature, precipitation, snow fall, and snow depth. These observations include:
    • Over 35,000 temperature stations.
    • Over 100,000 precipitation stations.
    • Over 50,000 snowfall or snow depth stations
  • The source of each datum, the term used for a single record, is contained in the dataset. Each datum has a quality control flag associated with it.
  • The dataset is updated daily. The historic sources are reprocessed weekly.

You can see in the graphic below how the data volume has grown in recent decades.

Figure 1: 1763 to 2018. For 1763 there are less than a thousand observations. For 2017 there are over 34 million observations.

Organization of the data on Amazon S3

As previously mentioned, the GHCN-D dataset is accessible through an Amazon S3 bucket. The details of the dataset are on the Registry of Open Data on AWS (RODA). The landing page for the dataset on RODA contains a link to a comprehensive readme file for the dataset. This readme contains all of the lookup tables and variable definitions.

This section shows the pertinent information required to start working with the dataset.

The data is in a text, or comma-separated values (CSV), format and is contained in the Amazon S3 bucket called noaa-ghcn-pds.

The noaa-ghcn-pds bucket contains virtual folders, and is structured like this:

  • noaa-ghcn-pds. This is the root of the bucket with two subdirectories and a number of useful files. For the purposes of this exercise, we use only the ghcnd-stations.txt file. This file contains information about the observation stations that produced the data for the GHCN_D dataset. You must download the ghcnd-stations.txt file.
  • noaa-ghcn-pds/csv/. This virtual folder contains all of the observations from 1763 to the present organized in .csv files, one file for every year. For this exercise, we’ll collate this data into a single table.

Also for the purpose of this exercise, the data from ‘ghcnd-stations.txt’ and the data contained in noaa-ghcn-pds/csv/ are extracted and added to two separate tables. These tables are the basis of the analysis.

The tables are labeled as:

  • tblallyears. This table contains all the records stored in the yearly .csv files from 1763 to present.
  • tblghcnd_stations. This table contains information for over 106,430 weather stations.

Point of interest: the .csv file from the year 1763 contains the data for one weather station. That station was located in the center of Milan, Italy.

The tools

To implement the general workflow in this exercise, we’re using the following tools:

  • Amazon Simple Storage Service (Amazon S3) to stage the data for analysis. The GHCN_D dataset is stored in a bucket on Amazon S3. We also use a private bucket to store new tables created from queries.
  • Amazon Athena to query data stored on Amazon S3 using standard SQL.
  • AWS Glue to extract and load data into Athena from the Amazon S3 buckets in which it is stored. AWS Glue is a fully managed extract, transform, and load (ETL) service.
  • AWS Glue Data Catalog to catalog the data that we query with Athena.
  • Amazon QuickSight to build visualizations, perform ad hoc analyses, and get insights from the dataset. Queries and tables from Athena can be read directly from Amazon QuickSight. Amazon QuickSight can also run queries against tables in Athena.

To implement the processes outlined in this post, you need an AWS Account. For more information about creating an AWS account, see Getting Started with AWS. You also must create a private Amazon S3 bucket located in the N. Virginia AWS Region. For more information, see Create a Bucket.

When you create the bucket, it must contain the following empty directories:

  1. [your_bucket_name]/stations_raw/
  2. [your_bucket_name]/ghcnblog/
  3. [your_bucket_name]/ghcnblog/stations/
  4. [your_bucket_name]/ghcnblog/allyears/
  5. [your_bucket_name]/ghcnblog/1836usa/

The following is an overview of how the various AWS services interact in this workflow.

Note

The AWS services are in the same AWS Region. One of the Amazon S3 buckets is the existing one that stores the GHCN_D data. The other Amazon S3 bucket is the bucket that you use for storing tables.

Figure 2: How the AWS services work together to compose this workflow.

The workflow

Now that we have the tools and the data, we are ready to:

  1. Extract the yearly .csv files and add them to a table in Amazon Athena.
  2. Extract the stations text file and add it to a separate table in Amazon Athena.

Extract the yearly .csv files and add it to a table in Amazon Athena

The complete set of daily weather observations is organized by year in one folder of the Amazon S3 bucket in .csv format. The path to the data is s3://noaa-ghcn-pds/csv/.

Each file is named by year beginning with 1763.csv and progressing one year at a time up to the present.

From the AWS console, click on AWS Athena. This takes you to the main dashboard for Athena. From here, click on AWS Glue Data Catalog. This brings you to AWS Glue.

In AWS Glue, choose the Tables section on the left side. Then, in the Add table drop-down menu, choose Add table manually. A series of forms displays for you to add the following information:

  • Set up your table’s properties:
    • Give the new table a name, for example, tblallyears
    • Create a database and name it ghcnblog.

The database then appears in the Athena dashboard.

  • Add a data store:
    • Choose the Specified path in another account option, and enter the following path in the text box: s3://noaa-ghcn-pds/csv/
  • Choose a data format:
    • Select CSV, then select Comma as the delimiter.
  • Define a schema:
    • Add the following columns as string variables:
      • id
      • year_date
      • element
      • data_value
      • m_flag
      • q_flag
      • s_flag
      • obs_time

For a full description of the variables and data structures, see the readme file.

  • Choose OK, then Finish.

Now return to the Athena dashboard, and choose the database that you created. The table will appear in the list of tables on the left. You can now preview the data by choosing the ‘Preview table’ option to the right of the table.

Use CTAS to speed up queries

As a final step, create a table using the SQL statement called CREATE TABLE AS SELECT (CTAS). Store the table in a private Amazon S3 bucket.

This step dramatically speeds up queries. The reason is because in this process we extract the data once and store the extracted data in a columnar format (Parquet) in the private Amazon S3 bucket.

To illustrate the improvement in speed, here are two examples:

  • A query that counts all of the distinct IDs, meaning unique weather stations, takes around 55 seconds and scans around 88 GB of data.
  • The same query on the converted data takes around 13 seconds and scans about 5 GB of data.

To create the table for this final step:

  1. Open the Athena console.
  2. In the dashboard, select New query, then enter the query as shown in the following example. Make sure to enter the information that’s applicable to your particular situation, such as your bucket name.
    Figure 3: Query to create a table data converting the data into Parquet and storing it in your S3 bucket.
  3. Make sure that the data format is Parquet.
  4. Name your table tblallyears_qa.
  5. Add this path to this folder in the private Amazon S3 bucket: [your_bucket_name]/ghcnblog/allyearsqa/. Replace your_bucket_name with your specific bucket name.

The new table appears in your database, listed on the left side of the Athena dashboard. This is the table that we work with going forward.

Extract the stations text file and add it to a separate table in Amazon Athena

The stations text file contains information about the weather stations, such as location, nationality, and ID. This data is kept in a separate file from the yearly observations. We need to import this data to look at the geographical spread of weather observations. While dealing with this file is a bit more complicated, the steps to importing this data into Athena are similar to what we have already done.

To import this data into Athena:

  1. Download the ghcnd-stations text file.
  2. Open the file in a spreadsheet program and use the fixed width-delimited data import function. The fixed widths of the columns are described in the readme file in the section called FORMAT OF “ghcnd-stations.txt” file.
  3. After you successfully import the data, save the spreadsheet as a .csv text file.
  4. Copy the new .csv file to [your_bucket_name]/stations_raw/. Replace your_bucket_name with your specific bucket name.
  5. Using this new .csv file, follow the Add table process steps in AWS Glue, as described earlier in this post.
    • Use the following field names:
      • id
      • latitude
      • longitude
      • elevation
      • state
      • name
      • gsn_flag
      • hcn_flag
      • wmo_id
    • Name the table tblghcnd_stations.
  6. After the table is created, follow the CREATE TABLE AS SELECT (CTAS) steps for this table as described earlier in this post.
    • Name the new table tblghcnd_stations_qa.
    • Store the new table in [your_bucket_name]/ghcnblog/stations/. Replace your_bucket_name with your specific bucket name.

The two most important datasets of GHCN_D are now in Athena.

In the next section, we run queries against these tables and visualize the results using Amazon QuickSight.

Exploratory data analysis and visualization

With our two tables created, we are now ready to query and visualize to gain insight.

Exploratory data analysis

In the Athena query window, run the following queries to get an idea of the size of the dataset.

Query #1: the total number of observations since 1763:

Figure 4: Query for total number of observations. This was run in autumn 2018. The dataset is continuingly growing over time.

Query #2: the number of stations since 1763:

Figure 5: Query for total number of stations that have made observations since 1763. Deactivated stations are included.

Average weather parameters for the Earth

The following figure shows a query that calculates the average maximum temperature (Celsius), average minimum temperature (Celsius), and average rainfall (mm) for the Earth since 1763.

In the query, we must convert the data_value from a String variable to a Real variable. We also must divide by 10, because the temperature and precipitation measurements are in tenths of their respective units. For more information about these details and the element codes (TMIB, TMAX and PRCP), see the readme file.

Figure 6. Querying for global averages to get to Earth’s pulse.

It would be convenient if we could just run simple queries, such as this one, on this dataset and accept that the results are correct.

The previous query is assuming an even and equal spread of weather stations around the world since 1763. In fact, the number and spread of weather stations varied over time.

Visualizing the growth in numbers of weather stations over time

The following query visualizes the number of weather stations for each year since 1763, by using Amazon QuickSight.

Note: You must be signed up for Amazon QuickSight to complete these steps. During the sign-up process, you are prompted to manage your Amazon QuickSight data source permissions. At this time, use step 3 in the following procedure to grant access to the Amazon S3 buckets and to Athena.

The steps are as follows:

  1. Open the Amazon QuickSight console.
  2. On the far right of the dashboard, choose Manage QuickSight.
  3. Choose Account Setting, then Manage QuickSight permissions. Give Amazon QuickSight permission to access Athena, and to read the Amazon S3 bucket that contains the new tables.
  4. Return to Amazon QuickSight by choosing the logo on the top left side of the screen.
  5. From the Amazon QuickSight dashboard, choose New analysis, then New data set.
  6. From the Create a Data Set tiles, choose Athena. Name the data source, for example ghcnblog, then choose Create data source.
  7. Choose the option to add a custom SQL, then add the SQL, as shown in the following example:

Figure 7: Location to add a custom SQL query.

  1. Choose Confirm query.
  2. Choose Directly query your data.
  3. Choose Visualize.
  4. To make the graph, choose the line chart graph. Add year to the X-axis and number_of_stations to the Value field wells. The options appear to the left of the visualization dashboard.

Figure 8. The number of global weather stations used by GHCN_D over time.

The resulting graph shows that the number and spread of stations around the world has varied over time.

A look at the development of observation in the US

1836 is the year of the first US observation station in the data set. To get an insight into the development of observations the US, we extracted a subset of US data from the main data source (tblallyears_qa). This dataset features annual data every 30th year from 1836 to 2016.

This query generates a large dataset. To improve performance, save the query as a table stored in an Amazon S3 bucket using the previously described procedure.

The query to do this in one step is shown in the following figure.

Figure 9: The SQL to create a table from a query and store it in Parquet format in a user-specified Amazon S3 bucket.

The table appears in the Amazon Athena dashboard as tbl1836every30thyear and it forms the basis for our analysis.

In the Amazon QuickSight console, use the follow SQL to generate a new dataset.

Figure 10: The SQL to create a dataset for viewing USA data in Amazon QuickSight.

  1. Choose Confirm query.
  2. Choose Directly query your data.
  3. Choose Visualize.

This brings you back to the visualization dashboard. From this dashboard, chose the Points on a map visualization, and set up the fields as follows:

  • Geospatial: state
  • Size: number_of_stations, aggregate by count.
  • Color: year

The results should be the following map of the US showing the growth of weather stations used by GHCN_D from 1836 to 2016 at 30-year increments. In 1836, there was one station. By 2016, there were thousands.

Figure 11: The growth of the number of observations stations in the US.

Interestingly, some states had more stations in 1956 than they did in 1986. This is also illustrated in the following figure. The data for the figure was derived from the previous dataset.

Figure 12: This heat map illustrates the number of stations per state over time. This is a 30th year snapshot.

A look at more data

We have now a data lake of GHN_D data. By using the tools that we have assembled, we are free to experiment with the data. It is now possible to construct queries and visualization on this huge dataset to gain insights.

The following figure shows the heat map that we created. It shows the average minimum temperature in US states over time. As before, we are looking at 30-year snapshots; that is to say, every 30th year we take a yearly average.

Figure 13: This heat map illustrates the minimum temperate for each state over time. A yearly average every 30th year starting at 1836.

Summary

Our headlines are full of Climate Change and Sustainability stories, and research and analysis has become more crucial than ever.

We showed researchers, analysts, and scientists how AWS services have reduced the level of technical skills required to fully use the GHCN_D dataset.

This GHCN-D is available on AWS. The details can be found on the Registry of Open Data on AWS. This data is available to researchers studying climate change and weather impacts.

This blog post demonstrated a typical workflow that a researcher could use to engage with and analyze this important data by using Amazon Athena, AWS Glue, and Amazon S3, and how they can visualize insights by using Amazon QuickSight.

By making this data available, Amazon has removed the heavy lifting that was traditionally required to work with the GHCN_D, thus expanding the opportunity for new research and new discoveries.

 


About the Authors

Joe Flasher is the Open Geospatial Data Lead at Amazon Web Services, helping organizations most effectively make data available for analysis in the cloud. Joe has been working with geospatial data and open source projects for the past decade, both as a contributor and maintainer. He has been a member of the Landsat Advisory Group, and has worked on projects, ranging from building GIS software to making the space shuttle fly. Joe’s background is in astrophysics, but he kindly requests you don’t ask him any questions about constellations.

 

 

Conor Delaney, PhD. is an environmental data scientist.

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Post Syndicated from Hugo Rozestraten original https://aws.amazon.com/blogs/big-data/create-real-time-clickstream-sessions-and-run-analytics-with-amazon-kinesis-data-analytics-aws-glue-and-amazon-athena/

Clickstream events are small pieces of data that are generated continuously with high speed and volume. Often, clickstream events are generated by user actions, and it is useful to analyze them.

For example, you can detect user behavior in a website or application by analyzing the sequence of clicks a user makes, the amount of time the user spends, where they usually begin the navigation, and how it ends. By tracking this user behavior in real time, you can update recommendations, perform advanced A/B testing, push notifications based on session length, and much more. To track and analyze these events, you need to identify and create sessions from them. The process of identifying events in the data and creating sessions is known as sessionization.

Capturing and processing data clickstream events in real time can be difficult. As the number of users and web and mobile assets you have increases, so does the volume of data. Amazon Kinesis provides you with the capabilities necessary to ingest this data in real time and generate useful statistics immediately so that you can take action.

When you run sessionization on clickstream data, you identify events and assign them to a session with a specified key and lag period. After each event has a key, you can perform analytics on them. The use cases for sessionization vary widely, and have different requirements. For example, you might need to identify and create sessions from events in web analytics to track user actions. Sessionization is also broadly used across many different areas, such as log data and IoT.

This blog post demonstrates how to identify and create sessions from real-time clickstream events and then analyze them using Amazon Kinesis Data Analytics.

Why did we choose Kinesis Data Analytics?

Clickstream data arrives continuously as thousands of messages per second receiving new events. When you analyze the effectiveness of new application features, site layout, or marketing campaigns, it is important to analyze them in real time so that you can take action faster.

To perform the sessionization in batch jobs, you could use a tool such as AWS Glue or Amazon EMR. But with daily schedules, queries and aggregation, it can take more resources and time because each aggregation involves working with large amounts of data. Performing sessionization in Kinesis Data Analytics takes less time and gives you a lower latency between the sessions generation. You can trigger real-time alerts with AWS Lambda functions based on conditions, such as session time that is shorter than 20 seconds, or a machine learning endpoint.

Identifying a session among thousands of clicks

A session is a short-lived and interactive exchange between two or more devices and/or users. For example, it can be a user browsing and then exiting your website, or an IoT device waking up to perform a job and then going back to sleep. These interactions result in a series of events that occur in sequence that start and end, or a session. A start and an end of a session can be difficult to determine, and are often defined by a time period without a relevant event associated with a user or device. A session starts when a new event arrives after a specified “lag” time period has passed without an event arriving. A session ends in a similar manner, when a new event does not arrive within the specified lag period.

This blog post relies on several other posts about performing batch analytics on SQL data with sessions. My two favorite posts on this subject are Sessionization in SQL, Hive, Pig and Python from Dataiku and Finding User Session with SQL by Benn Stancil at Mode. Both posts take advantage of SQL window functions to identify and build sessions from clickstream events.

ANSI added SQL window functions to the SQL standard in 2003 and has since expanded them. Window functions work naturally with streaming data and enable you to easily translate batch SQL examples to Kinesis Data Analytics.

In this use case, I group the events of a specific user as described in the following simplified example. In this example, I use distinct navigation patterns from three users to analyze user behavior. To begin, I group events by user ID to obtain some statistics from data, as shown following:

In this example, for “User ID 20,” the minimum timestamp is 2018-11-29 23:35:10 and the maximum timestamp is 2018-11-29 23:35:44. This provides a 34 seconds-long session, starting with action “B_10” and ending with action “A_02.” These “actions” are identification of the application’s buttons in this example.

Suppose that after several minutes, new “User ID 20” actions arrive. Would you consider them as running in the same session? A user can abort a navigation or start a new one. Also, applications often have timeouts. You have to decide what is the maximum session length to consider it a new session. A session can run anywhere from 20 to 50 seconds, or from 1 to 5 minutes.

There are other elements that you might want to consider, such as a client IP or a machine ID. These elements allow you to separate sessions that occur on different devices.

High-level solution overview

The end-to-end scenario described in this post uses Amazon Kinesis Data Streams to capture the clickstream data and Kinesis Data Analytics to build and analyze the sessions. The aggregated analytics are used to trigger real-time events on Lambda and then send them to Kinesis Data Firehose. Kinesis Data Firehose sends data to an Amazon S3 bucket, where it is ingested to a table by an AWS Glue crawler and made available for running queries with Amazon Athena. You can use this table for ad hoc analysis.

The following diagram shows an end-to-end sessionization solution.

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

After you finish the sessionization stage in Kinesis Data Analytics, you can output data into different tools. For example, you can use a Lambda function to process the data on the fly and take actions such as send SMS alerts or roll back a deployment. To learn how to implement such workflows based on AWS Lambda output, see my other blog post Implement Log Analytics using Amazon Kinesis Data Analytics. In this post, we send data to Amazon CloudWatch, and build a real-time dashboard.

Lambda clickstream generator

To generate the workload, you can use a Python Lambda function with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Lambda function payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

The following is the code for the Lambda function payload generator, which is scheduled using CloudWatch Events scheduled events:

...
def getReferrer():
    x = random.randint(1,5)
    x = x*50 
    y = x+30 
    data = {}
    data['user_id'] = random.randint(x,y)
    data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile','computer'])
    data['client_event'] = random.choice(['beer_vitrine_nav','beer_checkout','beer_product_detail',
    'beer_products','beer_selection','beer_cart'])
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['client_timestamp'] = str_now
    return data

def lambda_handler(event, context):
...
        data = json.dumps(getReferrer())
        kinesis.put_record(
                StreamName='sessionsclicks',
                Data=data,
                PartitionKey='partitionkey')

As a result, the following payloads are sent to Kinesis Data Analytics:

Using window SQL functions in Kinesis Data Analytics

Grouping sessions lets us combine all the events from a given user ID or a device ID that occurred during a specific time period. Amazon Kinesis Data Analytics SQL queries in your application code execute continuously over in-application streams. You need to specify bounded queries using a window defined in terms of time or rows. These queries are called window SQL functions.

I had three available options for windowed query functions in Kinesis Data Analytics: sliding windows, tumbling windows, and stagger windows. I chose stagger window because it has some good features for the sessionization use case, as follows:

  • Stagger windows open when the first event that matches a partition key condition arrives. So for each key, it evaluates its particular window as opposed to the other window functions that evaluate one unique window for all the partition keys matched.
  • When dealing with clickstreams, you cannot rely on the order that events arrive in the stream, but when the stream was generated. Stagger windows handle the arrival of out-of-order events well. The time when the window is opened and when the window closes is considered based on the age specified, which is measured from the time when the window opened.

To partition by the timestamp, I chose to write two distinct SQL functions.

In Kinesis Data Analytics, SOURCE_SQL_STREAM_001 is by default the main stream from the source. In this case, it’s receiving the source payload from Kinesis Data Streams.

Kinesis Data Analytics SQL – Create a stream

The following function creates a stream to receive the query aggregation result:

-- CREATE a Stream to receive the query aggregation result
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
  session_id VARCHAR(60),
  user_id INTEGER,
  device_id VARCHAR(10),
  timeagg timestamp,
  events INTEGER,
  beginnavigation VARCHAR(32),
  endnavigation VARCHAR(32),
  beginsession VARCHAR(25),
  endsession VARCHAR(25),
  duration_sec INTEGER
);

Kinesis Data Analytics SQL – Using a SECOND interval “STEP” function

The following function creates the PUMP and inserts it as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_SEC" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
    SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
    UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
    ||cast( UNIX_TIMESTAMP(STEP("client_timestamp" by interval '30' second))/1000 as VARCHAR(20))) as session_id,
    "user_id" , "device_id",
-- create a common rounded STEP timestamp for this session
    STEP("client_timestamp" by interval '30' second),
-- Count the number of client events , clicks on this session
    COUNT("client_event") events,
-- What was the first navigation action
    first_value("client_event") as beginnavigation,
-- what was the last navigation action    
    last_value("client_event") as endnavigation,
-- begining minute and second  
    SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second      
    SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration    
    TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream    
    FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with STEP to Seconds, for Seconds intervals    
    WINDOWED BY STAGGER (
                PARTITION BY "user_id", "device_id", STEP("client_timestamp" by interval '30' second) 
                RANGE INTERVAL '30' SECOND );

Kinesis Data Analytics SQL – Using a MINUTE interval “FLOOR” function

The following code creates the PUMP and inserts as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_MIN" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
||cast(UNIX_TIMESTAMP(FLOOR("client_timestamp" TO MINUTE))/1000 as VARCHAR(20))) as session_id,
"user_id" , "device_id",
-- create a common rounded timestamp for this session
FLOOR("client_timestamp" TO MINUTE),
-- Count the number of client events , clicks on this session
COUNT("client_event") events,
-- What was the first navigation action
first_value("client_event") as beginnavigation,
-- what was the last navigation action
last_value("client_event") as endnavigation,
-- begining minute and second
SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second
SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration
TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream
FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with floor to Minute, for Minute intervals
WINDOWED BY STAGGER (
            PARTITION BY "user_id", "device_id", FLOOR("client_timestamp" TO MINUTE) 
            RANGE INTERVAL '1' MINUTE);

Sessions

In Kinesis Data Analytics, you can view the resulting data transformed by the SQL, with the sessions identification and information. Session_ID is calculated by User_ID + (3 Chars) of DEVICE_ID + rounded Unix timestamp without the milliseconds.

Automated deployment with AWS CloudFormation

All the steps of this end-to-end solution are included in an AWS CloudFormation template. Fire up the template, add the code on your web server, and voilà, you get real-time sessionization.

This AWS CloudFormation template is intended to be deployed only in the us-east-1 Region.

Create the stack

Step 1: To get started, sign into the AWS Management Console, and then open the stagger window template.

Step 2: On the AWS CloudFormation console, choose Next, and complete the AWS CloudFormation parameters:

  • Stack name: The name of the stack (blog-sessionization or sessions-blog)
  • StreamName: sessionsblog
  • Stream Shard Count: 1 or 2 (1 MB/s) per shard.
  • Bucket Name:  Change to a unique name, for example session-n-bucket-hhug123121.
  • Buffer Interval: 60–900 seconds buffering hint for Kinesis Data Firehose before the data is send to Amazon S3 from Kinesis Data Firehose.
  • Buffer Size: 1–128 MB per file, if the interval is not achieved first.
  • Destination Prefix: Aggregated (internal folder of the bucket to save aggregated data).
  • Base sessions on seconds or minutes: Choose which you want (minutes will start with 1 minute, seconds will start with 30 seconds).

Step 3: Check if the launch has completed, and if it has not, check for errors.

The most common error is when you point to an Amazon S3 bucket that already exists.

Process the data

Step 1: After the deployment, navigate to the solution on the Amazon Kinesis console.

Step 2: Go to the Kinesis Analytics applications page, and choose AnalyticsApp-blog-sessionizationXXXXX, as follows.

Step 3: Choose Run application to start the application.

Step 4: Wait a few seconds for the application to be available, and then choose Application details.

Step 5: On the Application details page, choose Go to SQL results.

Step 6: Examine the SQL code and SOURCE_SQL_STREAM, and change the INTERVAL if you’d like.

Step 7: Choose the Real-time analytics tab to check the DESTINATION_SQL_STREAM results.

 

Step 8: Check the Destination tab to view the AWS Lambda function as the destination to your aggregation.

Step 8: Check the CloudWatch real-time dashboard.

Open the Sessionization-<your cloudformation stack name> dashboard.

Check the number of “events” during the sessions, and the “session duration” behavior from a timeframe. Then you can make decisions, such as whether you need to roll back a new site layout or new features of your application.

Step 9: Open the AWS Glue console and run the crawler that the AWS CloudFormation template created for you.

Choose the crawler job, and then choose Run crawler.

Analyze the data

Step 1: After the job finishes, open the Amazon Athena console and explore the data.

On the Athena console, choose the sessionization database in the list. You should see two tables created based on the data in Amazon S3: rawdata and aggregated.

Step 2: Choose the vertical ellipsis (three dots) on the right side to explore each of the tables, as shown in the following screenshots.

Step 3: Create a view on the Athena console to query only today’s data from your aggregated table, as follows:

CREATE OR REPLACE VIEW clicks_today AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) and
cast(partition_2 as integer)=day(current_date) ;

The successful query appears on the console as follows:

Step 4: Create a view to query only the current month data from your aggregated table, as in the following example:

CREATE OR REPLACE VIEW clicks_month AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) ;

The successful query appears as follows:

Step 5: Query data with the sessions grouped by the session duration ordered by sessions, as follows:

SELECT duration_sec, count(1) sessions 
FROM "clicks_today"
where duration_sec>0
group by duration_sec
order by sessions desc;

The query results appear as follows:

Visualize the data

Step 1: Open the Amazon QuickSight console.

If you have never used Amazon QuickSight, perform this setup first.

Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket.

First, select the Amazon Athena check box. Select the Amazon S3 check box to edit Amazon QuickSight access to your S3 buckets.

Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET.

In the list of data sources, choose Athena.

Step 5: Enter daily_session as your data source name.

Step 6: Choose the view that you created for daily sessions, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: Choose beginnavigation and duration_sec as metrics.

Step 9: Choose +Add to add a new visualization.

Step 10: In Visual types, choose the Tree map graph type.

Step 11: For Group by, choose device_id; for Size, choose duration_sec (Sum); and for Color, choose events (Sum).

Summary

In this post, I described how to perform sessionization of clickstream events and analyze them in a serverless architecture. The use of a Kinesis Data Analytics stagger window makes the SQL code short and easy to write and understand. The integration between the services enables a complete data flow with minimal coding.

You also learned about ways to explore and visualize this data using Amazon Athena, AWS Glue, and Amazon QuickSight.

To learn more about the Amazon Kinesis family of use cases, check the Amazon Kinesis Big Data Blog page.

If you have questions or suggestions, please leave a comment below.

Do more with Amazon Kinesis Data Analytics

To explore other ways to gain insights using Kinesis Data Analytics, see Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Author

Hugo is an analytics and database specialist solutions architect at Amazon Web Services out of São Paulo (Brazil). He is currently engaged with several Data Lake and Analytics projects for customers in Latin America. He loves family time, dogs and mountain biking.

 

 

 

 

Podcast 293: Diving into Data with Amazon Athena

Post Syndicated from Simon Elisha original https://aws.amazon.com/blogs/aws/podcast-293-diving-into-data-with-amazon-athena/

Do you have lots of data to analyze? Is writing SQL a skill you have? Would you like to analyze massive amounts of data at low cost without capacity planning? In this episode, Simon shares how Amazon Athena can give you options you may not have considered before.

Additional Resources

About the AWS Podcast

The AWS Podcast is a cloud platform podcast for developers, dev ops, and cloud professionals seeking the latest news and trends in storage, security, infrastructure, serverless, and more. Join Simon Elisha and Jeff Barr for regular updates, deep dives and interviews. Whether you’re building machine learning and AI models, open source projects, or hybrid cloud solutions, the AWS Podcast has something for you. Subscribe with one of the following:

Like the Podcast?

Rate us on iTunes and send your suggestions, show ideas, and comments to [email protected]. We want to hear from you!

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

 

Analyze and visualize nested JSON data with Amazon Athena and Amazon QuickSight

Post Syndicated from Mariano Kamp original https://aws.amazon.com/blogs/big-data/analyze-and-visualize-nested-json-data-with-amazon-athena-and-amazon-quicksight/

Although structured data remains the backbone for many data platforms, increasingly unstructured or semistructured data is used to enrich existing information or to create new insights. Amazon Athena enables you to analyze a wide variety of data. This includes tabular data in comma-separated value (CSV) or Apache Parquet files, data extracted from log files using regular expressions, and JSON-formatted data. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

In this blog post, I show you how to use JSON-formatted data and translate a nested data structure into a tabular view. For data engineers, using this type of data is becoming increasingly important. For example, you can use API-powered data feeds from operational systems to create data products. Such data can also help to add more finely grained facets to your understanding of customers and interactions. Understanding the fuller picture helps you better understand your customers and tailor experiences or predict outcomes.

To illustrate, I use an end-to-end example. It processes financial data retrieved from an API operation that is formatted as JSON. We analyze the data in Amazon Athena and visualize the results in Amazon QuickSight. Along the way, we compare and contrast alternative options.

The result looks similar to what you see below.

Analyzing JSON-formatted data

For our end-to-end example, we use financial data as provided by IEX. The financials API call pulls income statement, balance sheet, and cash flow data from four reported years of a stock.

Following, you can see example output. On the top level is an attribute called symbol, which identifies the stock described here: Apple. On the same level is an attribute called financials. This is a data container. The actual information is one level below, including such attributes as reportDatecashflow, and researchAndDevelopment.

The data container is an array. In the example following, financial data for only one year is shown. However, the {...} indicates that there might be more. In our case, data for four years is returned when making the actual API call.

{
  "symbol": "AAPL",
  "financials": [
    {
      "reportDate": "2017-03-31",
      "grossProfit": 20591000000,
      "costOfRevenue": 32305000000,
      "operatingRevenue": 52896000000,
      "totalRevenue": 52896000000,
      "operatingIncome": 14097000000,
      "netIncome": 11029000000,
      "researchAndDevelopment": 2776000000,
      "operatingExpense": 6494000000,
      "currentAssets": 101990000000,
      "totalAssets": 334532000000,
      "totalLiabilities": 200450000000,
      "currentCash": 15157000000,
      "currentDebt": 13991000000,
      "totalCash": 67101000000,
      "totalDebt": 98522000000,
      "shareholderEquity": 134082000000,
      "cashChange": -1214000000,
      "cashFlow": 12523000000,
      "operatingGainsLosses": null
    } // , { ... }
  ]
}

Data is provided for free by IEX (see the IEX Terms of Use).

It has become commonplace to use external data from API operations as feeds into Amazon S3. Although this is usually done in an automated fashion, in our case we manually acquire the API call’s results.

To download the data, you can use a script, described following.

Alternatively, you can click the following three links: 123. You can then save the resulting JSON files to your local disk, then upload the JSON to an Amazon S3 bucket. In my case, the location of the data is s3://athena-json/financials, but you should create your own bucket. The result looks similar to this:

You can also use a Unix-like shell on your local computer or on an Amazon EC2 instance to populate a S3 location with the API data:

$ curl -s "https://api.iextrading.com/1.0/stock/aapl/financials?period=annual" > aapl.json 
$ curl -s "https://api.iextrading.com/1.0/stock/nvda/financials?period=annual" > nvda.json 
$ curl -s "https://api.iextrading.com/1.0/stock/fb/financials?period=annual" > fb.json 

$ ls -ltrh *.json
-rw-r--r--  1 mkamp  ANT\Domain Users   2.2K Nov 21 16:57 aapl.json
-rw-r--r--  1 mkamp  ANT\Domain Users   2.1K Nov 21 16:57 nvda.json
-rw-r--r--  1 mkamp  ANT\Domain Users   2.1K Nov 21 16:57 fb.json 

$ aws s3 sync . s3://athena-json/financials/ --exclude "*" --include "*.json"
upload: ./aapl.json to s3://athena-json/financials/aapl.json   
upload: ./nvda.json to s3://athena-json/financials/nvda.json   
upload: ./fb.json to s3://athena-json/financials/fb.json       

$ aws s3 ls s3://athena-json/financials/
2018-11-21 16:58:30       2245 aapl.json
2018-11-21 16:58:30       2162 fb.json
2018-11-21 16:58:30       2150 nvda.json

Mapping JSON structures to table structures

Now we have the data in S3. Let’s make it accessible to Athena. This is a simple two-step process:

  1. Create metadata. Doing so is analogous to traditional databases, where we use DDL to describe a table structure. This step maps the structure of the JSON formatted data to columns.
  2. Specify where to find the JSON files.

We can use all information of the JSON file at this time, or we can concentrate on mapping the information that we need today. The new data structure in Athena overlays the files in S3 only virtually. Therefore, even though we just map a subset of the contained information at this time, all information is retained in the files and can be used later on as needed. This is a powerful concept and enables an iterative approach to data modeling.

You can use the following SQL statement to create the table. The table is then named financials_raw—see (1) following. We use that name to access the data from this point on. We map the symbol and the list of financials as an array and some figures. We define that the underlying files are to be interpreted as JSON in (2), and that the data lives following s3://athena-json/financials/ in (3).

CREATE EXTERNAL TABLE financials_raw ( -- (1)
    symbol string,
    financials array<
        struct<reportdate: string,
             grossprofit: bigint,
             totalrevenue: bigint,
             totalcash: bigint,
             totaldebt: bigint,
             researchanddevelopment: bigint>>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' -- (2)
LOCATION 's3://athena-json/financials/' -- (3)

You can run this statement using the Athena console as depicted following:

After you run the SQL statement on the left, the just-created table financials_raw is listed under the heading Tables. Now let’s have a look what’s in this table. Choose the three vertical dots to the right of the table name and choose Preview table. Athena creates a SELECT statement to show 10 rows of the table:

Looking at the output, you can see that Athena was able to understand the underlying data in the JSON files. Specifically, we can see two columns:

  • symbol, which contains flat data, the symbol of the stock
  • financials, which now contains an array of financials reports

If you look closely and observe the reportdate attribute, you find that the row contains more than one financial report.

Even though the data is nested—in our case financials is an array—you can access the elements directly from your column projections:

SELECT
  symbol, 
  financials[1].reportdate one_report_date, -- indexes start with 1
  financials[1].totalrevenue one_total_revenue,
  financials[2].reportdate another_report_date,
  financials[2].totalrevenue another_total_revenue
FROM
  financials_raw
ORDER BY
  1 -- the 1 indicates to order by the first column

As you can see preceding, all data is accessible. From this point on, it is structured, nested data, but not JSON anymore.

It’s still not tabular, though. We come back to this in a minute. First let’s have a look at a different way that would also have brought us to this point.

Alternative approach: Deferring the JSON extraction to query time

There are many different ways to use JSON formatted data in Athena. In the previous section, we use a simple, explicit, and rigid approach. In contrast, we now see a rather generic, dynamic approach.

In this case, we defer the final decisions about the data structures from table design to query design. To do that, we leave the data untouched in its JSON form as long as possible. As a consequence, the CREATE TABLE statement is much simpler than in the previous section:

CREATE EXTERNAL TABLE financials_raw_json (
  -- Using a mixed approach, extracting the symbol for 
  -- convenience directly from the JSON data
  symbol string,
  -- But otherwise storing the RAW JSON Data as string
  financials string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://athena-json/financials/' 
Executing
SELECT * FROM financials_raw_json

This shows that the data is accessible:

Even though the data is now accessible, it is only treated as a single string or varchar. This type is generic and doesn’t reflect the rich structure and the attributes of the underlying data.

But before diving into the richness of the data, I want to acknowledge that it’s hard to see from the query results which data type a column is. When using your queries, the focus is on the actual data, so seeing the data types all the time can be distracting. However in this case, when creating your queries and data structures, it is useful to use typeof. For example, use the following SQL statement:

SELECT typeof(financials) FROM financials_raw_json

Using this SQL statement, you can verify for yourself that the column is treated as a varchar.

To now introduce the data structure during query design, Athena provides specific functionality covered in the documentation to work with JSON formatted data.

The following table shows how to extract the data, starting at the root of the record in the first example. The table then shows additional examples on how to navigate further down the document tree. The first column shows the expression that can be used in a SQL statement like SELECT <expr> FROM financials_raw_json, where <expr> is to be replaced by the expression in the first column. The remaining columns explain the results.

Expression ResultTypeDescription 
json_extract(financials, '$')[{.., "reportdate":"2017-12-31",..},{..}, {..}, {.., "reportdate":"2014-12-31", ..}]jsonSelecting the root of the document (financials).
json_extract(financials, '$[0]'){.., "reportdate":"2017-12-31", "totalcash":"41711000000", ..}jsonSelecting the first element of the financials array. The indexing starts at 0, as opposed to 1, which is customary in SQL.
json_extract(financials, '$[0].reportdate')"2017-12-31"jsonSelecting the totalcash attribute of the first element of the financials array.
json_extract_scalar(financials, '$[0].reportdate')2017-12-31varcharAs preceding, but now the type became a varchar because we are now using json_extract_scalar.
json_size(financials, '$')4bigintThe size of the financials array; 4 represents the four years contained in each JSON.

To implement our example, we now have more than enough skills and we can leave it at that.
However, there are more functions to go back and forth between JSON and Athena. You can find more information in the Apache Presto documentation. Athena is our managed service based on Apache Presto. Thus, when looking for information it is also helpful to consult Presto documentation.

Let’s put the JSON functions introduced preceding to use:

SELECT 
  symbol,
  -- indexes start with 0, as is customary with JavaScript/JSON
  json_extract_scalar(financials, '$[0].reportdate') one_report_date,  
  json_extract_scalar(financials, '$[0].totalrevenue') one_total_revenue,
  json_extract_scalar(financials, '$[1].reportdate') another_report_date,
  json_extract_scalar(financials, '$[1].totalrevenue') another_total_revenue
FROM
  financials_raw_json
ORDER BY 
  1

As with the first approach, we still have to deal with the nested data inside the rows. By doing so, we can get rid of the explicit indexing of the financial reports as used preceding.

Comparing approaches

If you go back and compare our latest SQL query with our earlier SQL query, you can see that they produce the same output. On the surface, they even look alike because they project the same attributes. But a closer look reveals that the first statement uses a structure that has already been created during CREATE TABLE. In contrast, the second approach interprets the JSON document for each column projection as part of the query.

Interpreting the data structures during the query design enables you to change the structures across different SQL queries or even within the same SQL query. Different column projections in the same query can interpret the same data, even the same column, differently. This can be extremely powerful, if such a dynamic and differentiated interpretation of the data is valuable. On the other hand, it takes more discipline to make sure that during maintenance different interpretations are not introduced by accident.

In both approaches, the underlying data is not touched. Athena only overlays the physical data, which makes changing the structure of your interpretation fast. Which approach better suits you depends on the intended use.

To determine this, you can ask the following questions. As a rule of thumb, are your intended users data engineers or data scientists? Do they want to experiment and change their mind frequently? Maybe they even want to have different use case–specific interpretations of the same data, Then they would fare better with the latter approach of leaving the JSON data untouched until query design. They would also then likely be willing to invest in learning the JSON extensions to gain access to this dynamic approach.

If on the other hand your users have established data sources with stable structures, the former approach fits better. It enables your users to query the data with SQL only, with no need for information about the underlying JSON data structures.

Use the following side-by-side comparison to choose the appropriate approach for your case at hand.

Data structure interpretation happens at table creation timeData structure interpretation happens at query creation time
The interpretation of data structures is scoped to the whole table. All subsequent queries use the same structures.The data interpretation is scoped to an individual query. Each query can potentially interpret the data differently
The interpretation of data structures evolves centrally.The interpretation of data structures can be changed on a per-query basis so that different queries can evolve with different speeds and into different directions.
It is easy to provide a single version of the truth, because there is just a single interpretation of the underlying data structures.A single version of the truth is hard to maintain and needs coordination across the different queries using the same data. Rapidly evolving data interpretations can easily go hand-in-hand with an evolving understanding of use cases.
Applicable to well-understood data structures that are slowly and consciously evolving. A single interpretation of the underlying data structures is valued more than change velocity.Applicable to experimental, rapidly evolving interpretations of data structures and use cases. Change velocity is more important than a single, stable interpretation of data structures.
Production data pipelines benefit from this approach.Exploratory data analysis benefit from this approach.

Both approaches can serve well at different times in the development lifecycle, and each approach can be migrated to the other.

In any case, this is not a black and white decision. In our example, we keep the tables financials_raw and financials_raw_json, both accessing the same underlying data. The data structures are just metadata, so keeping both around doesn’t store the actual data redundantly.

For example, financials_raw might be used by data engineers as the source of productive pipelines where the attributes and their meaning are well-understood and stable across use cases. At the same time, data scientists might use financials_raw_json for exploratory data analysis where they refine their interpretation of the data rapidly and on a per-query basis.

Working with nested data

At this point, we can access data that is JSON formatted through Athena. However, the underlying structure is still hierarchical, and the data is still nested. For many use cases, especially for analytical uses, expressing data in a tabular fashion—as rows—is more natural. This is also the standard way when using SQL and business intelligence tools. To unnest the hierarchical data into flattened rows, we need to reconcile these two approaches.

To simplify, we can set the financial reports example aside for the moment. Instead, let’s experiment with a narrower example. Reconciling different ways of thinking can sometimes be hard to follow. The narrow example and hands-on experimentation should make this easier. Copy the code we discuss into the Athena console to play along.

The following code is self-contained and uses synthetic data. This lends itself particular well to experimentation:

SELECT 
  parent, children
FROM (
  VALUES
    ('Parent 1', ARRAY['Child 1.1', 'Child 1.2']),
    ('Parent 2', ARRAY['Child 2.1', 'Child 2.2', 'Child 2.3'])
) AS t(parent, children)

Looking at the data, this is similar to our situation with the financial reports. There we had multiple financial reports for one stock symbol, multiple children for each parent. To flatten the data, we first unnest the individual children for each parent. Then we cross-join each child with its parent, which creates an individual row for each child that contains the child and its parent.

In the following SQL statement, UNNEST takes the children column from the original table as a parameter. It creates a new dataset with the new column child, which is later cross-joined. The enclosing SELECT statement can then reference the new child column directly.

SELECT 
  parent, child
FROM (
  VALUES
    ('Parent 1', ARRAY['Child 1.1', 'Child 1.2']),
    ('Parent 2', ARRAY['Child 2.1', 'Child 2.2', 'Child 2.3'])
) AS t(parent, children)
CROSS JOIN UNNEST(children) AS t(child)

If you played along with the simplified example, it should be easy now to see how this method can be applied to our financial reports:

SELECT 
    symbol,
    report
FROM 
    financials_raw
CROSS JOIN UNNEST(financials) AS t(report)

Bam! Now that was easy, wasn’t it?

Using this as a basis, let’s select the data that we want to provide to our business users and turn the query into a view. The underlying data has still not been touched, is still formatted as JSON, and is still expressed using nested hierarchies. The new view makes all of this transparent and provides a tabular view.

Let’s create the view:

CREATE OR REPLACE VIEW financial_reports_view AS
SELECT 
  symbol,
  CAST(report.reportdate AS DATE) reportdate,
  report.totalrevenue,
  report.researchanddevelopment
FROM 
  financials_raw
CROSS JOIN UNNEST(financials) AS t(report)
ORDER BY 1 ASC, 2 DESC
and then check our work:
SELECT
  *
FROM
  financial_reports_view

This is a good basis and acts as an interface for our business users.

The previous steps were based on the initial approach of mapping the JSON structures directly to columns. Let’s also explore the alternative path that we discussed before. How does this look like when we keep the data JSON formatted for longer, as we did in our alternative approach?

For variety, this approach also shows json_parse, which is used here to parse the whole JSON document and converts the list of financial reports and their contained key-value pairs into an ARRAY(MAP(VARCHAR, VARCHAR)). This array in turn is then used in the unnesting and its children eventually in the column projections. With element_at elements in the JSON, you can access the value by name. You can also see the use of WITH to define subqueries, helping to structure the SQL statement.

If you run the following query, it returns the same result as the approach preceding. You can also turn this query into a view.

WITH financial_reports_parsed AS (
  SELECT 
    symbol,   
    CAST(json_parse(financials) AS ARRAY(MAP(VARCHAR, VARCHAR))) financial_reports
  FROM         
    financials_raw_json)
SELECT 
  symbol,
  CAST(element_at(report, 'reportdate') AS DATE) reportdate,  
  element_at(report, 'totalrevenue') totalrevenue,
  element_at(report, 'researchanddevelopment') researchanddevelopment
FROM
  financial_reports_parsed
CROSS JOIN UNNEST(financial_reports) AS t(report)
ORDER BY 1 ASC, 2 DESC

Visualizing the data

Let’s get back to our example. We created the financial_reports_view that acts as our interface to other business intelligence tools. In this blog post, we use it to provide data for visualization using Amazon QuickSight. Amazon QuickSight can directly access data through Athena. Its pay-per-session pricing enables you to put analytical insights into the hands of everyone in your organization.

Let’s set this up together. We first need to select our view to create a new data source in Athena and then we use this data source to populate the visualization.

We are creating the visual that is displayed at the top of this post. If you want just the data and you’re not interested in condensing data to a visual story, you can skip ahead to the post conclusion section.

Creating an Athena data source in Amazon QuickSight

Before we can use the data in Amazon QuickSight, we need to first grant access to the underlying S3 bucket. If you haven’t done so already for other analyses, see our documentation on how to do so.

On the Amazon QuickSight home page, choose Manage data from the upper-right corner, then choose New data set and pick Athena as data source. In the following dialog box, give the data source a descriptive name and choose Create data source.

Choose the default database and our view financial_reports_view, then choose Select to confirm. If you used multiple schemas in Athena, you could pick them here as your database.

In the next dialog box, you can choose if you want to import the data into SPICE for quicker analytics or to directly query the data.

SPICE is the super-fast, parallel, in-memory calculation engine in Amazon QuickSight. For our example, you can go either way. Using SPICE results in the data being loaded from Athena only once, until it is either manually refreshed or automatically refreshed (using a schedule). Using direct query means that all queries are run on Athena.

Our view now is a data source for Amazon QuickSight and we can turn to visualizing the data.

Creating a visual in Amazon QuickSight

You can see the data fields on the left. Notice that reportdate is shown with a calendar symbol and researchanddevelopment as a number. Amazon QuickSight picks up the data types that we defined in Athena.

The canvas on the right is still empty. Before we populate it with data, let’s select Line Chart from the available visual types.

To populate the graph, drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the reportdate onto the X axis well. We put our metric researchanddevelopment towards the value well, so that it’s displayed on the y-axis. We put the symbol onto the Color well, helping us to tell the different stocks apart.

An initial version of our visualization is now shown on the canvas. Drag the handle at the lower-right corner to adjust the size to your liking. Also, pick Format visual from the drop-down menu in the upper right corner. Doing this opens a dialog with more options to enhance the visualization.

Expand the Data labels section and choose Show data labels. Your changes are immediately reflected in the visualization.

You can also interact with the data directly. Given that Amazon QuickSight picked up on the reportdate being a DATE, it provides a date slider at the bottom of the visual. You can use this slider to adjust the time frame shown.

You can add further customizations. These can include changing the title of the visual or the axis, adjusting the size of the visual, and adding additional visualizations. Other possible customizations are adding data filters and capturing the combination of visuals into a dashboard. You might even turn the dashboard into a scheduled report that gets sent out once a day by email.

Conclusion

We have seen how to use JSON formatted data that is stored in S3. We contrasted two approaches to map the JSON formatted data to data structures in Athena:

  • Mapping the JSON structures at table creation time to columns.
  • Leaving the JSON structures untouched and instead mapping the contents as a whole to a string, so that the JSON contents remains intact. The JSON contents can later be interpreted and the structures at query creation time mapped to columns.

The approaches are not mutually exclusive, but can be used in parallel for the same underlying data.

Furthermore, JSON data can be hierarchical, which must be unnested and cross-joined to provide the data in a flattened, tabular fashion.

For our example, we provided the data in a tabular fashion and created a view that encapsulates the transformations, hiding the complexity from its users. We used the view as an interface to Amazon QuickSight. Amazon QuickSight directly accesses the Athena view and visualizes the data.

More on using JSON

JSON features blend nicely into the existing SQL oriented functions in Athena, but are not ANSI SQL compatible. Also, the JSON file is expected to carry each record in a separate line (see the JSON lines website).

In the documentation for the JSON SerDe Libraries, you can find how to use the property ignore.malformed.json to indicate if malformed JSON records should be turned into nulls or an error. Further information about the two possible JSON SerDe implementations is linked in the documentation. If necessary, you can dig deeper and find out how to take explicit control of how column names are parsed, for example to avoid clashing with reserved keywords.

How to efficiently store data

During our excursions, we never touched the actual data. We only defined different ways to interpret the data. This approach works well for us here, because we are only dealing with a small amount of data. If you want to use these concepts at scale, consider how to apply partitioning of data and possibly how to consolidate data into larger files.

Depending on the data, also consider whether storing it in a columnar fashion, using for example Apache Parquet might be beneficial. You can find additional practical suggestions in our AWS Big Data Blog post Top 10 Performance Tuning Tips for Amazon Athena.

All these options don’t replace what you learned in this article, but benefit from your being able to compare and contrast JSON formatted data and nested data. They can be used in a complementary fashion.

Further, this AWS Big Data Blog post walks you through a real-world scenario showing how to store and query data efficiently.


About the Author

Mariano Kamp is a principal solutions architect with Amazon Web Services. He works with financial services customers in Germany and has more than 25 years of industry experience covering a wide range of technologies. His area of depth is Analytics.

In his spare time, Mariano enjoys hiking with his wife.

 

 

 

 

Chasing earthquakes: How to prepare an unstructured dataset for visualization via ETL processing with Amazon Redshift

Post Syndicated from Ian Funnell original https://aws.amazon.com/blogs/big-data/chasing-earthquakes-how-to-prepare-an-unstructured-dataset-for-visualization-via-etl-processing-with-amazon-redshift/

As organizations expand analytics practices and hire data scientists and other specialized roles, big data pipelines are growing increasingly complex. Sophisticated models are being built using the troves of data being collected every second.

The bottleneck today is often not the know-how of analytical techniques. Rather, it’s the difficulty of building and maintaining ETL (extract, transform, and load) jobs using tools that might be unsuitable for the cloud.

In this post, I demonstrate a solution to this challenge. I start with a noisy semistructured dataset of seismic events, spanning several years and recorded at different locations across the globe. I set out to obtain broad insights about the nature of the rocks forming the Earth’s surface itself—the tectonic plate structure—to be visualized using the mapping capability in Amazon QuickSight.

To accomplish this, I use several AWS services, orchestrated together using Matillion ETL for Amazon Redshift:

Tectonic plate structure context

An earthquake is caused by a buildup of pressure that gets suddenly released. Earthquakes tend to be more severe at the boundaries of destructive tectonic plates. These boundaries are formed when a heavier and denser oceanic plate collides with a lighter continental plate, or when two oceanic plates collide. Due to the difference in density, the oceanic lithosphere is pushed underneath the continental plate, forming what is called a subduction zone. (See the following diagram.) In subduction zones, earthquakes can occur at depths as great as 700 kilometers.

Photography by KDS4444 [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)], from Wikimedia Commons

For our analysis, we ultimately want to visualize the depth of an earthquake focus to locate subduction zones, and therefore find places on earth with the most severe earthquakes.

Seismographic data source

The data source is from the International Federation of Digital Seismograph Networks (FDSN). The event data is in JSON format (from the European Mediterranean Seismological Centre, or EMSC). An external process accumulates files daily into an Amazon S3 bucket, as shown following.

Each individual file contains all the seismic events for one day—usually several hundred—in an embedded array named “features,” as shown in the following example:

{
  "type": "FeatureCollection",
  "metadata": {
    "totalCount": 103
  },
  "features": [
    {
      "geometry": {
        "type": "Point",
        "coordinates": [26.76, 45.77, -140]
      },
      "type": "Feature",
      "id": "20180302_0000103",
      "properties": {
        "lastupdate": "2018-03-02T23:27:00.0Z",
        "lon": 26.76, "lat": 45.77, "depth": 140,
        "mag": 3.7,
        "time": "2018-03-02T23:22:52.1Z",
        "flynn_region": "ROMANIA"
      }
    },
    {
      "geometry": {
        "type": "Point",

Architecture overview

Athena reads and flattens the S3 data and makes it available for Matillion ETL to load into Amazon Redshift via JDBC. Matillion orchestrates this data movement, and it also provides a graphical framework to design and build the more complex data enrichment and aggregation steps to be performed by Amazon Redshift. Finally, the prepared data is queried by Amazon QuickSight for visualization.

Amazon Athena setup

You can use Athena to query data in S3 using standard SQL, via a serverless infrastructure that is managed entirely by AWS on your behalf. Before you can query your data, start by creating an external table. By doing this, you are defining the schema to apply to the data when it is being queried.

You can choose to use an AWS Glue crawler to assist in automatically discovering the schema and format of your source data files.

The following is the CREATE TABLE statement that you can copy and paste into the Athena console to create the schema needed to query the seismic data. Make sure that you substitute the correct S3 path to your seismic data in the LOCATION field of the statement.

CREATE EXTERNAL TABLE `sp_events`(
  `type` string COMMENT 'from deserializer', 
  `metadata` struct<totalcount:int> COMMENT 'from deserializer', 
  `features` array<struct<geometry:struct<type:string,coordinates:array<double>>,type:string,id:string,properties:struct<lastupdate:string,magtype:string,evtype:string,lon:double,auth:string,lat:double,depth:double,unid:string,mag:double,time:string,source_id:string,source_catalog:string,flynn_region:string>>> COMMENT 'from deserializer')
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your-bucket/SeismicPortal'

After the table is created, you are ready to query it. Athena uses the in-memory, distributed SQL engine called Apache Presto. It provides the ability to unnest arrays, which you use next.

Transfer to Amazon Redshift

The embedded array in every source record gets flattened out and converted to individual records during the JDBC export (download the .jar file) into Amazon Redshift. You use a Matillion ETL Database Query component to assist with the data transfer during this step, as shown in the following image.

This component simplifies ETL by automating the following steps:

Runs the SQL SELECT statement (shown in the following example).

  1. Streams query results across the network from Athena and into temporary storage in S3.
  2. Performs a bulk data load into Amazon Redshift.

Athena executes the following SQL statement:

SELECT f.id,
	   f.properties.time AS event_time,
	   f.properties.lastupdate,
   f.properties.lon,
   f.properties.lat,
   f.properties.depth,
   f.properties.mag,
   f.properties.flynn_region
FROM “seismic”.”sp_events”
CROSS JOIN UNNEST (features) as t(f)

The CROSS JOIN UNNEST syntax flattens the embedded array, generating hundreds of individual event records per day.

Now that the data has been copied and flattened into individual event records (shown in the following image), it’s ready for enrichment and aggregation.

Data enrichment

Earthquakes occur along a continuous range of spatial coordinates. In order to aggregate them, as we’ll be doing very soon, it’s necessary to first group them together. A convenient method is to assign every event into a Universal Transverse Mercator (UTM) zone. These zones are six-degree bands of longitudes that convert the spherical latitude/longitude coordinates into a 2D representation. Performing this conversion provides good granularity for visualization later.

The calculation to convert a spherical longitude/latitude coordinate into a two-dimensional UTM coordinate is complex. It can be performed ideally using an Amazon Redshift user-defined function (UDF). I chose a UDF for the ability to invoke it, via a Matillion component, in the next step.

CREATE OR REPLACE FUNCTION f_ll_utm (Lat float, Long float)
      RETURNS VARCHAR
STABLE
AS $$
From math import pi, sin, cos, tan, sqrt

_deg2rad = pi / 180.0
_rad2deg = 180.0 / pi

_EquatorialRadius = 1
_eccentricitySquared = 2
_ellipsoid = [ “WGS-84”, 6378137, 0.00669438]

The UDF has to return three pieces of information:

  • UTM Zone code
  • Easting (x-axis measurement in meters)
  • Northing (ditto, for the y-axis)

A scalar UDF can only return a single value. Therefore the three results were returned as a pipe-delimited string, in which the three values are pipe-separated:

To bring the values out into individual fields, the UDF is first invoked using a Matillion ETL Calculator component, followed by a field splitter and a Calculator to perform data type conversion and rounding.

Data aggregation

To reiterate, we’re interested in the depth of earthquake focus specifically on destructive plate boundaries. Knowing the depth helps us estimate the potential severity of earthquakes.

We need to find the average event depth within each UTM zone, in the expectation that a spatial pattern will appear that will highlight the subduction zones.

The last three steps in the Matillion transformation (shown in the following diagram) perform the necessary aggregation, add a depth quartile, and create an output table from the resulting data.

The ”Aggregate to UTM ref” step gets Amazon Redshift to perform a GROUP BY function in SQL, which approximates every event to the appropriate UTM zone. While doing this aggregation, you simultaneously do the following:

  • Count the events (which determines the size of the visual representation).
  • Find the average depth (which determines the color of the visual representation).
  • Determine the average latitude and longitude (which approximates to the center of the UTM zone, and determines the position of the visual representation).

The following image shows the aggregation type for each column:

Average depth is a useful measure, but to maximize the visual impact of the final presentation, we also take the opportunity to rank the results into quartiles. This allows the zones with the deepest quartile to stand out consistently on the map.

NTILE(4) OVER (ORDER BY "avg_depth")

Amazon Redshift is great at performing this type of analytics, which is delivered inside another Matillion ETL Calculator component.

The Recreate Output step materializes the dataset into an Amazon Redshift table, ready for Amazon QuickSight to visualize.

Amazon QuickSight visualization

The Amazon QuickSight “points on map” visualization is perfect for this 2D rendering. The values for the field wells come straight from the aggregated data in Amazon Redshift:

  • Geospatial — the average lat/long per UTM grid.
  • Size — the record count, in other words, the number of events in that UTM zone.
  • Color — the Depth Ntile, with the fourth quartile in pink.

The resulting map shows the global subduction zones highlighted clearly in pink, these being the areas with the deepest earthquake’s focus on average.

Recap and summary

In this post, I used seismological data as an example to explore challenges around the visualization of unstructured data and to provide best practices. I suggested a way to overcome these challenges with an architecture that is also applicable for datasets from a wide array of sources, beyond geology. I then explored how to orchestrate activities of different data processing tasks between S3, Athena, and Amazon Redshift using Matillion ETL for Amazon Redshift.

If you’re trying to solve similar analytics challenges and want to see how Amazon Redshift and Matillion can help, launch a 14 day free trial of Matillion ETL for Amazon Redshift on the AWS Marketplace or schedule a demo today. If you have questions or suggestions, please comment below.


Additional Reading

If you found this post helpful, be sure to check out Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift, and Orchestrate multiple ETL jobs using AWS Step Functions and AWS Lambda.

 


About the Author

Ian Funnell, a lead solution architect at Matillion, is a seasoned cloud and data warehousing specialist. Ian is dedicated to supporting customers in driving their data transformation forward and solving their deepest technical challenges. Ian’s has 25+ years experience in the tech industry.

 

 

 

Using CTAS statements with Amazon Athena to reduce cost and improve performance

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/using-ctas-statements-with-amazon-athena-to-reduce-cost-and-improve-performance/

Amazon Athena is an interactive query service that makes it more efficient to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena recently released support for creating tables using the results of a SELECT query or CREATE TABLE AS SELECT (CTAS) statement. Analysts can use CTAS statements to create new tables from existing tables on a subset of data, or a subset of columns. They also have options to convert the data into columnar formats, such as Apache Parquet and Apache ORC, and partition it. Athena automatically adds the resultant table and partitions to the AWS Glue Data Catalog, making them immediately available for subsequent queries.

CTAS statements help reduce cost and improve performance by allowing users to run queries on smaller tables constructed from larger tables. This post covers three use cases that demonstrate the benefit of using CTAS to create a new dataset, smaller than the original one, allowing subsequent queries to run faster. Assuming our use case requires repeatedly querying the data, we can now query a smaller and more optimal dataset to get the results faster.

Using Amazon Athena CTAS

The familiar CREATE TABLE statement creates an empty table. In contrast, the CTAS statement creates a new table containing the result of a SELECT query. The new table’s metadata is automatically added to the AWS Glue Data Catalog. The data files are stored in Amazon S3 at the designated location. When creating new tables using CTAS, you can include a WITH statement to define table-specific parameters, such as file format, compression, and partition columns. For more information about the parameters you can use, see Creating a Table from Query Results (CTAS).

Before you begin: Set up CloudTrail for querying with Athena

If you don’t already use Athena to query your AWS CloudTrail data, we recommend you set this up. To do so:

  1. Open the CloudTrail console.
  2. On the left side of the console, choose Event History.
  3. At the top of the window, choose Run advanced queries in Amazon Athena.
  4. Follow the setup wizard and create your Athena table.

It takes some time for data to collect. If this is your first time, it takes about an hour to get meaningful data. This assumes that there is activity in your AWS account.

This post assumes that your CloudTrail table is named cloudtrail_logs, and that it resides in the default database.

Use Case 1: optimizing for repeated queries by reducing dataset size

As with other AWS services, Athena uses AWS CloudTrail to track its API calls. In this use case, we use CloudTrail to provide an insight into our Athena usage. CloudTrail automatically publishes data in JSON format to S3. We use a CTAS statement to create a table with only 30 days of Athena API events, to remove all of the other API events that we don’t care about.  This reduces the table size, which improves subsequent queries.

The following query uses the last 30 days of Athena events. It creates a new table called “athena_30_days” and saves the data files in Parquet format.

CREATE TABLE athena_30_days
AS
SELECT
  date(from_iso8601_timestamp(eventtime)) AS dt,
  *
FROM cloudtrail_logs
WHERE eventsource = 'athena.amazonaws.com'
AND 
  date(from_iso8601_timestamp(eventtime)) 
    BETWEEN current_date - interval '30' day AND current_date

Executing this query on the original CloudTrail data takes close to 5 minutes to run, and scans around 14 GB of data. This is because the raw data is in JSON format and not well partitioned.  Executing a SELECT * on the newly created table now takes 1.7 seconds and scans 1.14MB of data.

Now you can run multiple queries or build a dashboard on the reduced dataset.

For example, the following query aggregates the total count of each Athena API, grouping results by IAM user, date, and API event name.  This query took only 1.8 seconds to complete.

SELECT 
  dt, 
  eventname,
  count(eventname) as event_count,
  split_part(useridentity.arn, ':', 6) as user
FROM athena_30_days
GROUP BY 1,2,4
ORDER BY event_count DESC

Use case 2: Selecting a smaller number of columns

In this use case, I join the CloudTrail table with the S3 Inventory table while only selecting specific columns relevant to my analysis.  I use CTAS to generate a table from the results.

CREATE TABLE athena_s3_30_days
AS
SELECT 
  json_extract_scalar(ct.requestparameters, '$.bucketName') AS bucket,
  json_extract_scalar(ct.requestparameters, '$.key') AS key,
  ct.useridentity.username AS username,
  ct.eventname,
  cast (from_iso8601_timestamp(ct.eventtime) as timestamp) as ts,
  s3.storage_class,
  s3.size
FROM cloudtrail_logs ct
JOIN s3inventory s3 
ON json_extract_scalar(ct.requestparameters, '$.bucketName') = s3.bucket
AND json_extract_scalar(ct.requestparameters, '$.key') = s3.key
AND date(from_iso8601_timestamp(ct.eventtime)) = date(s3.last_modified_date)
WHERE ct.eventsource = 's3.amazonaws.com' 
AND ct.eventname = 'GetObject'
AND ct.useridentity.invokedby LIKE '%athena%'
AND date(from_iso8601_timestamp(eventtime)) 
    BETWEEN current_date - interval '30' day AND current_date

The previous query example returns the last 30 days of S3 GetObject API events that were invoked by the Athena service.  It adds the S3 object size and storage class for each event returned from the S3 Inventory table.

We can then, for example, count the number of times each key has been accessed by Athena, ordering the results based on the count from small to large.  This provides us an indication of the size of files we’re scanning and how often. Knowing this helps us determine if we should optimize by performing compaction on those keys.

SELECT
  bucket,
  size,
  key,
  count(key) AS key_count
FROM athena_s3_30_days
GROUP BY 1,2,3
ORDER BY key_count DESC

In the case of my example, it looks like this:

Use case 3: Repartitioning an existing table

The third use case I want to highlight where CTAS can be of value is taking an existing unoptimized dataset, converting it to Apache ORC and partitioning it to better optimize for repeated queries.  We’ll take the last 100 days of CloudTrail events and partition it by date.

CREATE TABLE cloudtrail_partitioned
WITH (
  partitioned_by = ARRAY['year', 'month'],
  format = 'orc',
  external_location = 's3://royon-demo/cloudtrail_partitioned'
)
AS
SELECT
  *, 
  year(date(from_iso8601_timestamp(eventtime))) as year,
  month(date(from_iso8601_timestamp(eventtime))) as month
FROM cloudtrail_logs

Notice that I’ve added a WITH clause following the CREATE TABLE keywords but before the AS keyword.  Within the WITH clause, we can define the table properties that we want.  In this particular case, we declared “year” and “month” as our partitioning columns and defined ORC as the output format.  The reason I used ORC is because CloudTrail data may contain empty columns that are not allowed by the Parquet specification, but are allowed by ORC.  Additionally, I defined the external S3 location to store our table.  If we don’t define an external location, Athena uses the default query result S3 location.

The resulting S3 destination bucket looks similar to the following example:

An additional optimization supported by Athena CTAS is bucketing.  Partitioning is used to group similar types of data based on a specific column.  Bucketing is commonly used to combine data within a partition into a number of equal groups, or files.  Therefore, partitioning is best suited for low cardinality columns and bucketing is best suited for high cardinality columns.  For more information, see Bucketing vs Partitioning.

Let’s take the previous CTAS example and add bucketing.

CREATE TABLE cloudtrail_partitioned_bucketed
WITH (
  partitioned_by = ARRAY['year', 'month'],
  bucketed_by = ARRAY['eventname'],
  bucket_count = 3,
  format = 'orc',
  external_location = 's3://royon-demo/cloudtrail_partitioned'
)
AS
SELECT
  *, 
  year(date(from_iso8601_timestamp(eventtime))) as year,
  month(date(from_iso8601_timestamp(eventtime))) as month
FROM cloudtrail_logs

 

And this is what it looks like in S3:

Here is an example query on both a partitioned table and a partitioned and bucketed table.  You can see that the speed is similar, but that the bucketed query scans less data.

Partitioned table:

Partitioned and bucketed table:

Conclusion

In this post, we introduced CREATE TABLE AS SELECT (CTAS) in Amazon Athena. CTAS lets you create a new table from the result of a SELECT query. The new table can be stored in Parquet, ORC, Avro, JSON, and TEXTFILE formats. Additionally, the new table can be partitioned and bucketed for improved performance. We looked at how CTAS helps with three common use cases:

  1. Reducing a large dataset into a smaller, more efficient dataset.
  2. Selecting a subset of the columns and rows to only deliver what the consumer of the data really needs.
  3. Partitioning and bucketing a dataset that is not currently optimized to improve performance and reduce the cost.

Additional Reading

If you found this post useful, be sure to check out How Realtor.com Monitors Amazon Athena Usage with AWS CloudTrail and Amazon QuickSight.

 


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

Connect to Amazon Athena with federated identities using temporary credentials

Post Syndicated from Nitin Wagh original https://aws.amazon.com/blogs/big-data/connect-to-amazon-athena-with-federated-identities-using-temporary-credentials/

Many organizations have standardized on centralized user management, most commonly Microsoft Active Directory or LDAP.  Access to AWS resources is no exception.  Amazon Athena is a serverless query engine for data on Amazon S3 that is popular for quick and cost-effective queries of data in a data lake.  To allow users or applications to access Athena, organizations are required to use an AWS access key and an access secret key from which appropriate policies are enforced. To maintain a consistent authorization model across, organizations must enable authentication and authorization for Athena by using federated users.

This blog post shows the process of enabling federated user access with the AWS Security Token Service (AWS STS). This approach lets you create temporary security credentials and provides them to trusted users for running queries in Athena.

Temporary security credentials in AWS STS

Temporary security credentials ensures that access keys to protected AWS resources are properly rotated. Therefore, potential security leaks can be caught and remedied. AWS STS generates these per-use temporary access keys.

Temporary security credentials work similar to the long-term access key credentials that your IAM users can use. However, temporary security credentials have the following differences:

  • They are intended for short-term use only. You can configure these credentials to last from a few minutes to several hours, with a maximum of 12 hours. After they expire, AWS no longer recognizes them, or allows any kind of access from API requests made with them.
  • They are not stored with the user. They are generated dynamically and provided to the user when requested. When or before they expire, the user can request new credentials, if they still have permissions to do so.

Common scenarios for federated access

The following common scenarios describe when your organization may require federated access to Athena:

  1. Running queries in Athena while using federation. Your group is required to run queries in Athena while federating into AWS using SAML with permissions stored in Active Directory.
  2. Enabling access across accounts to Athena for users in your organization. Users in your organization with access to one AWS account needs to run Athena queries in a different account.
  3. Enabling access to Athena for a data application. A data application deployed on an Amazon EC2 instance needs to run Athena queries via JDBC.

Athena as the query engine for your data lake on S3

Athena is an interactive query service that lets you analyze data directly in Amazon S3 by using standard SQL. You can access Athena by using JDBC and ODBC drivers, AWS SDK, or the Athena console.

Athena enables schema-on-read analytics to gain insights from structured or semi-structured datasets found in the data lake. A data lake is ubiquitous, scalable, and reliable storage that lets you consume all of your structured and unstructured data. Customers increasingly prefer a serverless approach to querying data in their data lake.  Some benefits of using an Amazon S3 for a data lake include:

  • The ability to efficiently consume and store any data, at any scale, at low cost.
  • A single destination for searching and finding the relevant data.
  • Analysis of the data in S3 through a unified set of tools.

Solution overview

The following sections describe how to enable the common scenarios introduced previously in this post. They show how to download, install, and configure SQL Workbench to run queries in Athena. Next, they show how to use AWS STS with a custom JDBC credentials provider to obtain temporary credentials for an authorized user.  These credentials are passed to Athena’s JDBC driver, which enables SQL Workbench to run authorized queries.

As a reminder, the scenarios are:

  1. Running queries in Athena while using federation.
  2. Enabling access across accounts to Athena for users in your organization.
  3. Enabling access to Athena for a data application.

Walkthrough

This walkthrough uses a sample table created to query Amazon CloudFront logs.  It demonstrates that proper access has been granted to Athena after each scenario. This walkthrough also assumes that you have a table for testing.  For information about creating a table, see Getting Started with Amazon Athena.

Prerequisites for scenarios 1 and 2

  1. Download and install SQL Workbench.
  2. Download the Athena custom credentials provider .jar file to the same computer where SQL Workbench is installed.

Note: You can also compile the .jar file by using this Athena JDBC source code.

  1. Download the Amazon Athena JDBC driver to the same computer where SQL Workbench is installed.
  2. In SQL Workbench, open File, Connect window, Manage Drivers. Choose the Athena driver and add two libraries, Athena JDBC driver and the custom credentials provider by specifying the location where you downloaded them.

Scenario 1: SAML Federation with Active Directory

In this scenario, we use a SAML command line tool. It performs a SAML handshake with an identity provider, and then retrieves temporary security credentials from AWS STS. We then pass the obtained security credentials through a custom credentials provider to run queries in Athena.

Before You Begin

  1. Make sure you have followed the prerequisites for scenarios 1 and 2.
  2. Set up a SAML integration with ADFS. For information, see Enabling Federation to AWS Using Windows Active Directory, ADFS, and SAML 2.0.
  3. Add the following IAM policies to the ADFS-Production role:

  1. Set up federated AWS CLI For more information, see How to Implement a General Solution for Federated API/CLI Access Using SAML 2.0

Executing queries in Athena with a SAML federated user identity

  1. Run the federated AWS CLI script configured as part of the prerequisites. Log in using a valid user name and password, then choose the ADFS-Production role. As a result, your temporary access_key, secret_access_key and session_token are generated. They are stored in a “credentials” file under the [saml] profile that looks similar to the following:
 [saml]
output = json
region = us-east-1
aws_access_key_id = XXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXX
aws_session_token = XXXXXXXXXXXXXXXXX
  1. To enable running queries in Athena through SQL Workbench, configure an Athena connection as follows:

  1. Choose Extended Properties, and enter the properties as follows:
"AWSCredentialsProviderClass"="com.amazonaws.athena.jdbc.CustomIAMRoleAssumptionSAMLCredentialsProvider"
"AWSCredentialsProviderArguments"="<access_key_id>, <secret_access_key>, <session token>"
"S3OutputLocation"="s3://<bucket where query results are stored>"
"LogPath"= "<local path on laptop, or pc where logs are stored>"
"LogLevel"= "<Log Level from 0 to 6>"

  1. Choose Test to verify that you can successfully connect to Athena.
  2. Run a query in SQL Workbench to verify that credentials are correctly applied.

Scenario 2: Cross-account access

In this scenario, you allow users in one AWS account, referred to as Account A to run Athena queries in a different account, called Account B.

To enable this configuration, use the CustomIAMRoleAssumptionCredentialsProvider custom credentials provider to retrieve the necessary credentials. By doing so, you can run Athena queries by using credentials from Account A in Account B. This is made possible by the cross-account roles, as shown in the following diagram:

Before You Begin

1. If you haven’t already done so, follow these prerequisites for scenarios 1 and 2.

  1. Use AWS CLI to define a role named AthenaCrossAccountRole in Account B, as follows:
aws iam create-role --role-name AthenaCrossAccountRole --assume-role-policy-document file://cross-account-trust.json

Content of cross-account-trust.json file

{
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<ACCOUNT-A-ID>:root"
      },
      "Action": "sts:AssumeRole"
      }
    }
  ]
}
  1. Attach the IAM policies, AmazonAthenaFullAccess and AmazonS3FullAccess, to the AthenaCrossAccountRole IAM role, as follows:
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess --role-name AthenaCrossAccountRole

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess --role-name --role-name AthenaCrossAccountRole

Executing Queries in Athena with a SAML Federated User Identity

  1. Set up a new Athena database connection in SQLWorkbench, as shown in the following example:

  1. Choose Extended Properties, and enter the properties as shown in the following example. This example configures the AWS credentials of an IAM user from Account A (AccessKey, SecretKey, and Cross Account Role ARN).
"AwsCredentialsProviderClass"="com.amazonaws.custom.athena.jdbc.CustomIAMRoleAssumptionCredentialsProvider"
"AwsCredentialsProviderArguments"="<aws_key_id>,<secret_key_id>,<cross Account Role ARN>"
"S3OutputLocation"="s3://<bucket where Athena results are stored>"
"LogPath"= "<local directory path on laptop, or pc where logs are stored>"
“LogLevel” = “Log Level from 1 to 6”

  1. Choose Test to verify that you can successfully connect to Athena.
  2. Run a query in SQL Workbench to verify that credentials are correctly applied.

Scenario 3: Using EC2 Instance Profile Role

This scenario uses the Amazon EC2 instance profile role to retrieve temporary credentials.  First, create the EC2AthenaInstanceProfileRole IAM role via AWS CLI, as shown in the following example:

aws iam create-role --role-name EC2AthenaInstanceProfileRole --assume-role-policy-document file://policy.json

Content of policy.json file

{
  "Statement": [
      {
        "Action": "sts:AssumeRole",
        "Effect": "Allow",
        "Principal": {
           "Service": "ec2.amazonaws.com"
         }
      }
  ]
}

Attach the IAM policies, AmazonAthenaFullAccess and AmazonS3FullAccess, to the EC2AthenaInstanceProfileRole IAM role, as follows:

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess --role-name EC2AthenaInstanceProfileRole

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess --role-name --role-name EC2AthenaInstanceProfileRole

Before You Begin

  1. Launch the Amazon EC2 instance for Windows, then attach the InstanceProfile role created in the previous step:
aws ec2 run-instances --image-id <use Amazon EC2 on Windows 2012 AMI Id for your region> --iam-instance-profile 'Arn=arn:aws:iam::<your_aws_account_id>:instance-profile/EC2AthenaInstanceProfileRole' --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=AthenaTest}]' --count 1 --instance-type t2.micro --key-name <your key> --security-group-ids <your_windows_security_group_id>

2.    Log in to your Windows instance using RDP.

3.    Install Java 8 and download and install SQL Workbench.

4.    Download the Athena JDBC driver.

Executing queries in Athena using an EC2 instance profile role

1.      In SQL Workbench, open File, Connect window, Manage Drivers. Specify a path to the Athena JDBC driver that was previously downloaded, as shown in the following example.

  1. The instance credentials provider, InstanceProfileCredentialsProvider, is included with the Amazon Athena JDBC driver. In SQL Workbench, set up an Athena connection as follows:

  1. Choose Extended Properties, and enter the properties as follows:
"AwsCredentialsProviderClass"="com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider"
"S3OutputLocation"="s3://<bucket where Athena results are stored>"
"LogPath"= "<local directory path on laptop, or pc where logs are stored>"
“LogLevel” = “Log Level from 0 to 6”

  1. Choose Test to verify that you can successfully connect to Athena.
  2. Run a query in SQL Workbench to verify that credentials are correctly applied.

Conclusion

This post walked through three scenarios to enable trusted users to access Athena using temporary security credentials. First, we used SAML federation where user credentials were stored in Active Directory. Second, we used a custom credentials provider library to enable cross-account access. And third, we used an EC2 Instance Profile role to provide temporary credentials for users in our organization to access Athena.

These approaches ensure that access keys protecting AWS resources are not directly hardcoded in applications and can be easily revoked as needed. To achieve this, we used AWS STS to generate temporary per-use credentials.

The scenarios demonstrated in this post used SQL Workbench.  However, they apply for all other uses of the JDBC driver with Amazon Athena. Additionally, when using AWS SDK with Athena, similar approaches also apply.

Happy querying!

 


Additional Reading

If you found this post useful, be sure to check out Top 10 Performance Tuning Tips for Amazon Athena, and Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena.

 


About the Author

Nitin Wagh is a Solutions Architect with Amazon Web Services specializing in Big Data Analytics. He helps AWS customers develop distributed big data solutions using AWS technologies.

 

 

 

 

Store, Protect, Optimize Your Healthcare Data with AWS: Part 2

Post Syndicated from Stephen Jepsen original https://aws.amazon.com/blogs/architecture/store-protect-optimize-your-healthcare-data-with-aws-part-2/

Leveraging Analytics and Machine Learning Tools for Readmissions Prediction

This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.

In Part 1, we looked at various options to ingest and store sensitive healthcare data using AWS. The post described our shared responsibility model and provided a reference architecture that healthcare organizations could use as a foundation to build a robust platform on AWS to store and protect their sensitive data, including protected health information (PHI). In Part 2, we will dive deeper into how customers can optimize their healthcare datasets for analytics and machine learning (ML) to address clinical and operational challenges.

There are a number of factors creating pressures for healthcare organizations, both providers and payers, to adopt analytic tools to better understand their data: regulatory requirements, changing reimbursement models from volume- to value-based care, population health management for risk-bearing organizations, and movement toward personalized medicine. As organizations deploy new solutions to address these areas, the availability of large and complex datasets from electronic health records, genomics, images (for example, CAT, PET, MRI, ultrasound, X-ray), and IoT has been increasing. With these data assets growing in size, healthcare organizations want to leverage analytic and ML tools to derive new actionable insights across their departments.

One example of the use of ML in healthcare is diagnostic image analysis, including digital pathology. Pathology is extremely important in diagnosing and treating patients, but it is also extremely time-consuming and largely a manual process. While the complexity and quantity of workloads are increasing, the number of pathologists is decreasing. According to one study, the number of active pathologists could drop by 30 percent by 2030 compared to 2010 levels. (1) A cloud architecture and solution can automate part of the workflow, including sample management, analysis, storing, sharing, and comparison with previous samples to complement existing provider workflows effectively. A recent study using deep learning to analyze metastatic breast cancer tissue samples resulted in an approximately 85% reduction in human error rate. (2)

ML is also being used to assist radiologists in examining other diagnostic images such as X-rays, MRIs, and CAT scans. Having large quantities of images and metadata to train the algorithms that are the key to ML is one of the main challenges for ML adoption. To help address this problem, the National Institutes of Health recently released 90,000 X-ray plates tagged either with one of 14 diseases or tagged as being normal. Leading academic medical centers are using these images to build their neural networks and train their algorithms. With advanced analytics and ML, we can answer the hard questions such as “what is the next best action for my patient, the expected outcome, and the cost.”

The foundations for a great analytical layer

Let’s pick up from where we left off in Part 1. We have seen how providers can ingest data into AWS from their data centers and store it securely into different services depending on the type of data. For example:

  1. All object data is stored in Amazon S3, Amazon S3 Infrequent Access, or Amazon Glacier depending on how often they are used.
  2. Data from the provider’s database is either processed and stored as objects in Amazon S3 or aggregated into data marts on Amazon Redshift.
  3. Metadata of the objects on Amazon S3 are maintained in the DynamoDB database.
  4. Amazon Athena is used to query the objects directly stored on Amazon S3 to address ad hoc requirements.

We will now look at two best practices that are key to building a robust analytical layer using these datasets.

  1. Separating storage and compute: You should not be compelled to scale compute resources just to store more data. The scaling rules of the two layers should be separate.
  2. Leverage the vast array of AWS big data services when it comes to building the analytical platforms instead of concentrating on just a few of them. Remember, one size does not fit all.

Technical overview

In this overview, we will demonstrate how we can leverage AWS big data and ML services to build a scalable analytical layer for our healthcare data. We will use a single source of data stored in Amazon S3 for performing ad hoc analysis using Amazon Athena, integrate it with a data warehouse on Amazon Redshift, build a visual dashboard for some metrics using Amazon QuickSight, and finally build a ML model to predict readmissions using Amazon SageMaker. By not moving the data around and just connecting to it using different services, we avoid building redundant copies of the same data. There are multiple advantages to this approach:

  1. We optimize our storage. Not having redundant copies reduces the amount of storage required.
  2. We keep the data secure with only authorized services having access to it. Keeping multiple copies of the data can result in higher security risk.
  3. We are able to scale the storage and compute separately as needed.
  4. It becomes easier to manage the data and monitor usage metrics centrally such as how often the data has been accessed, who has been accessing it, and what has been the growth pattern of the data over a period of time. These metrics can be difficult to aggregate if the data is duplicated multiple times.

Let’s build out this architecture using the following steps:

  1. Create a database in AWS Glue Data Catalog

We will do this using a Glue crawler. First create a JSON file that contains the parameters for the Glue crawler.

{
"Name": "readmissions",
"Role": "arn of the role for Glue",
"DatabaseName": "readmissions",
"Description": "glue data catalog for storing readmission data",
"Targets": {
"S3Targets": [
{
"Path": "s3://<bucket>/<prefix>"
},
{
"Path": "s3://<bucket>/<prefix>"
}
]
}
}

As you can see, the crawler will crawl two locations in Amazon S3 and save the resulting tables in a new database called “readmissions.” Replace the role ARN and Amazon S3 locations with your corresponding details. Save this in a file create_crawler.json. Then from the AWS CLI, call the following command to create the crawler:

aws glue create-crawler --cli-input-json file://create_crawler.json

Once the crawler is created, run it by calling the following command:

aws glue start-crawler --name readmissions

Log on to the AWS Glue console, navigate to the crawlers, and wait until the crawler completes running.

This will create two tables — phi and non-phi — in a database named “readmissions” in the AWS Glue Data Catalog as shown below.

  1. Query the data using Athena

The Amazon Glue Data Catalog is seamlessly integrated with Amazon Athena. For details on how to enable this, see Integration with AWS Glue.

As a result of this integration, the tables created using the Glue crawler can now be queried using Amazon Athena. Amazon Athena allows you to do ad hoc analysis on the dataset. You can do exploratory analysis on the data and also determine its structure and quality. This type of upfront ad hoc analysis is invaluable for ensuring the data quality in your downstream data warehouse or your ML algorithms that will make use of this data for training models. In the next few sections, we will explore these aspects in greater detail.

To query the data using Amazon Athena, navigate to the Amazon Athena console.

NOTE: Make sure the region is the same as the region you chose in the previous step. If it’s not the same, switch the region by using the drop-down menu on the top right-hand corner of the screen.

Once you arrive in the Amazon Athena console, you should already see the tables and databases you created previously, and you should be able to see the data in the two tables by writing Amazon Athena queries. Here is a list of the top 10 rows from the table readmissions.nonphi:

Now that we are able to query the dataset, we can run some queries for exploratory analysis. Here are just a few examples:

AnalysisAmazon Athena Query
How many Patients have been discharged to home?SELECT count(*) from nonphi where discharge_disposition = ‘Discharged to home’
What’s the minimum and the maximum number of procedures carried out on a patient?SELECT min(num_procedures), max(num_procedures) from nonphi
How many patients were referred to this hospital by another physician?SELECT count(*) FROM nonphi group by admission_source having admission_source = ‘Physician Referral’
What were the top 5 specialties with positive readmissions?

SELECT count(readmission_result) as num_readmissions, medical_specialty from

(select readmission_result,medical_specialty from nonphi where readmission_result = ‘Yes’)

group by medical_specialty order by num_readmissions desc limit 5

Which payer was responsible for paying for treatments that involved more than 5 procedures?SELECT distinct payer_code from nonphi where num_procedures >5 and payer_code !='(null)’

While this information is valuable, you typically do not want to invest too much time and effort into building an ad hoc query platform like this because at this stage, you are not even sure if the data is of any value for your business-critical analytical applications. One benefit of using Amazon Athena for ad hoc analysis is that it requires little effort or time. It uses Schema-On-Read instead of schema on write, allowing you to work with various source data formats without worrying about the underlying structures. You can put the data on Amazon S3 and start querying immediately.

  1. Create an external table in Amazon Redshift Spectrum with the same data

Now that we are satisfied with the data quality and understand the structure of the data, we would like to integrate this with a data warehouse. We’ll use Amazon Redshift Spectrum to create external tables on the files in S3 and then integrate these external tables with a physical table in Amazon Redshift.

Amazon Redshift Spectrum allows you to run Amazon Redshift SQL queries against data on Amazon S3, extending the capabilities of your data warehouse beyond the physical Amazon Redshift clusters. You don’t need to do any elaborate ETL or move the data around. The data exists in one place in Amazon S3 and you interface with it using different services (Athena and Redshift Spectrum) to satisfy different requirements.

Before beginning, please look at this step by step guide to set up Redshift Spectrum.

After you have set up Amazon Redshift Spectrum, you can begin executing the steps below:

  1. Create an external schema called “readmissions.” Amazon Redshift Spectrum integrates with the Amazon Glue Data Catalog and allows you to create spectrum tables by referring the catalog. This feature allows you to build the external table on the same data that you analyzed with Amazon Athena in the previous step without the need for ETL. This can be achieved by the following:
create external schema readmissions
from data catalog
database 'readmissions'
iam_role 'arn for your redshift spectrum role '
region ‘region when the S3 data exists’;

NOTE: Make sure you select the appropriate role arn and region.

  1. Once the command executes successfully, you can confirm the schema was created by running the following:
select * from svv_external_schemas;

You should see a row similar to the one above with your corresponding region and role.

You can also see the external tables that were created by running the following command:

select * from SVV_EXTERNAL_TABLES;

  1. Let’s confirm we can see all the rows in the external table by counting the number of rows:
select count(*) from readmissions.phi;
select count(*) from readmissions.nonphi;

You should see 101,766 rows in both the tables, confirming that your external tables contain all the records that you read using the AWS Glue data crawler and analyzed using Athena.

  1. Now that we have all the external tables created, let’s create an aggregate fact table in the physical Redshift data warehouse. We can use the “As Select” clause of the Redshift create table query to do this:
create table readmissions_aggregate_fact as
select
readmission_result,admission_type,discharge_disposition,diabetesmed,
avg(time_in_hospital) as avg_time_in_hospital,
min(num_procedures) as min_procedures,
max(num_procedures) as max_procedures,
avg(num_procedures) as avg_num_procedures,
avg(num_medications) as avg_num_medications,
avg(number_outpatient) as avg_number_outpatient,
avg(number_emergency) as avg_number_emergency,
avg(number_inpatient) as avg_number_inpatient,
avg(number_diagnoses) as avg_number_diagnoses
from readmissions.nonphi
group by readmission_result,admission_type,discharge_disposition,diabetesmed

Once this query executes successfully, you can see a new table created in the physical public schema of your Amazon Redshift cluster. You can confirm this by executing the following query:

select distinct(tablename) from pg_table_def where schemaname = 'public'

  1. Build a QuickSight Dashboard from the aggregate fact

We can now create dashboards to visualize the data in our readmissions aggregate fact table using Amazon QuickSight. Here are some examples of reports you can generate using Amazon QuickSight on the readmission data.

For more details on Amazon QuickSight, refer to the service documentation.

  1. Build a ML model in Amazon SageMaker to predict readmissions

As a final step, we will create a ML model to predict the attribute readmission_result, which denotes if a patient was readmitted or not, using the non-PHI dataset.

  1. Create a notebook instance in Amazon SageMaker that is used to develop our code.
  2. The code reads non-PHI data from the Amazon S3 bucket as a data frame in Python. This is achieved using the pandas.readcsv function.

  1. Use the pandas.get_dummies function to encode categorical values into numeric values for use with the model.

  1. Split the data into two, 80% for training and 20% for testing, using the numpy.random.rand function.

  1. Form train_X, train_y and test_X, test_y corresponding to training features, training labels, testing features, and testing labels respectively.

  1. Use the Amazon SageMaker Linear learner algorithm to train our model. The implementation of the algorithm uses dense tensor format to optimize the training job. Use the function write_numpy_to_dense_tensor from the Amazon SageMaker library to convert the numpy array into the dense tensor format.

  1. Create the training job in Amazon SageMaker with appropriate configurations and run it.

  1. Once the training job completes, create an endpoint in Amazon SageMaker to host our model, using the linear.deploy function to deploy the endpoint.

  1. Finally, run a prediction by invoking the endpoint using the linear_predictor.predict function.

You can view the complete notebook here.

Data, analytics, and ML are strategic assets to help you manage your patients, staff, equipment, and supplies more efficiently. These technologies can also help you be more proactive in treating and preventing disease. Industry luminaries share this opinion: “By leveraging big data and scientific advancements while maintaining the important doctor-patient bond, we believe we can create a health system that will go beyond curing disease after the fact to preventing disease before it strikes by focusing on health and wellness,” writes Lloyd B. Minor, MD, dean of the Stanford School of Medicine.

ML and analytics offer huge value in helping achieve the quadruple aim : improved patient satisfaction, improved population health, improved provider satisfaction, and reduced costs. Technology should never replace the clinician but instead become an extension of the clinician and allow them to be more efficient by removing some of the mundane, repetitive tasks involved in prevention, diagnostics, and treatment of patients.

(1) “The Digital Future of Pathology.” The Medical Futurist, 28 May 2018, medicalfuturist.com/digital-future-pathology.

(2) Wang, Dayong, et al. “Deep Learning for Identifying Metastatic Breast Cancer.” Deep Learning for Identifying Metastatic Breast Cancer, 18 June 2016, arxiv.org/abs/1606.05718.

About the Author

Stephen Jepsen is a Global HCLS Practice Manager in AWS Professional Services.

 

Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/analyze-and-visualize-your-vpc-network-traffic-using-amazon-kinesis-and-amazon-athena/

Network log analysis is a common practice in many organizations.  By capturing and analyzing network logs, you can learn how devices on your network are communicating with each other, and the internet.  There are many reasons for performing log analysis, such as audit and compliance, system troubleshooting, or security forensics.  Within an Amazon Virtual Private Cloud (VPC), you can capture network flows with VPC Flow Logs.  You can create a flow log for a VPC, a subnet, or a network interface.  If you create a flow log for a subnet or VPC, each network interface in the VPC or subnet is monitored. Flow log data is published to a log group in Amazon CloudWatch Logs, and each network interface has a unique log stream.

CloudWatch Logs provides some great tools to get insights into this log data.  However, in most cases, you want to efficiently archive the log data to S3 and query it using SQL.  This provides more flexibility and control over log retention and the analysis you want to perform.  But also, you often want the ability to obtain near real-time insights into that log data by performing analysis automatically, soon after the log data has been generated.  And, you want to visualize certain network characteristics on a dashboard so you can more clearly understand the network traffic within your VPC.  So how can you accomplish both efficient log archival to S3, real-time network analysis, and data visualization?  This can be accomplished by combining several capabilities of CloudWatch, Amazon Kinesis, AWS Glue, and Amazon Athena, but setting up this solution and configuring all the services can be daunting.

In this blog post, we describe the complete solution for collecting, analyzing, and visualizing VPC flow log data.  In addition, we created a single AWS CloudFormation template that lets you efficiently deploy this solution into your own account.

Solution overview

This section describes the overall architecture and each step of this solution.

We want the ability to query the flow log data in a one-time, or ad hoc, fashion. We also want to analyze it in near real time. So our flow log data takes two paths through the solution.  For ad hoc queries, we use Amazon Athena.  By using Athena, you can use standard SQL to query data that has been written to S3.  An Athena best practice to improve query performance and reduce cost is to store data in a columnar format such as Apache Parquet.  This solution uses Kinesis Data Firehose’s record format conversion feature to convert the flow log data to Parquet before it writes the files to S3. Converting the data into a compressed, columnar format lowers your cost and improves query performance by enabling Athena to scan less data from S3 when executing your queries.

By streaming the data to Kinesis Data Firehose from CloudWatch logs, we have enabled our second path for near real-time analysis on the flow log data.  Kinesis Data Analytics is used to analyze the log data as soon as it is delivered to Kinesis Data Firehose.  The Analytics application aggregates key data from the flow logs and creates custom CloudWatch metrics that are used to drive a near real-time CloudWatch dashboard.

Let’s review each step in detail.

1.  VPC Flow Logs

The VPC Flow Logs feature contains the network flows in a VPC.  In this solution, it is assumed that you want to capture all network traffic within a single VPC.  By using the CloudFormation template, and you can define the VPC you want to capture.  Each line in the flow log contains space-delimited information about the packets traversing the network between two entities, which are source and destination.  The log line contains details including the source and destination IP addresses and ports, the number of packets, and the action taken on that data. Examples of the action taken would be whether it was accepted or rejected.  Here’s an example of a typical flow log:

2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK

For more information about each item in the line, see Flow Log Records.  Note that VPC flow logs buffer for about 10 minutes before they’re delivered to CloudWatch Logs.

2.  Stream to Kinesis Data Firehose

By creating a CloudWatch Logs subscription, our flow logs can automatically be streamed when they arrive in CloudWatch Logs.  This solution’s subscription filter uses Kinesis Data Firehose as its destination.  Kinesis Data Firehose is the most effective way to load streaming data into data stores, such as Amazon S3.  The CloudWatch Logs subscription filter has also been configured to parse the space-delimited log lines and create a structured JSON object for each line in the log.  The naming convention for each attribute in the object follow the names defined by for each element by VPC Flow Logs.  Therefore, the example log line referenced earlier streams as the following JSON record:

{
    "version": 2,
    "account-id": "123456789010",
    "interface-id": "eni-abc123de",
    "srcaddr": "172.31.16.139",
    "dstaddr": "172.31.16.21",
    "srcport": 20641,
    "dstport": 22,
    "protocol": 6,
    "packets": 20,
    "bytes": 4249,
    "start": 1418530010,
    "end": 1418530070,
    "action": "ACCEPT",
    "log-status": "OK"
}

CloudWatch Logs subscriptions sends data to the configured destination as a gzipped collection of records.  Before we can analyze the data, we must first decompress it.

3.  Decompress records with AWS Lambda

There may be situations where you want to transform or enrich streaming data before writing it to its final destination.  In this solution, we must decompress the data that is streamed from CloudWatch Logs.  With the Amazon Kinesis Data Firehose Data Transformation feature, we can decompress the data with an AWS Lambda function.  Kinesis Data Firehose manages the invocation of the function.  Inside the function, the data is decompressed and returned to Kinesis Data Firehose.  The complete source code for the Lambda function can be found here.

4.  Convert data to Apache Parquet

To take advantage of the performance capabilities in Amazon Athena, we convert the streaming data to Apache Parquet before persisting it to S3.  We use the record format conversion capabilities of Kinesis Data Firehose to perform this conversion.  When converting from JSON to Parquet, Kinesis Data Firehose must know the schema.  To accomplish this, we configure a table in the Glue Data Catalog.  In this table, we map the attributes of our incoming JSON records to fields in the table.

5.  Persist data to Amazon S3

When using the data format conversion feature in Kinesis Data Firehose, the only supported destination is S3.  Kinesis Data Firehose buffers data for a period of time, or until a data size threshold is met, before it creates the Parquet files in S3.  In general, converting to Parquet results in effective file compression.  If the file size is too small, it isn’t optimal for Athena queries.  To maximize the file sizes created in S3, the solution has been configured to buffer for 15 minutes, or 128 MB.  However, you can adjust this configuration to meet your needs by using the Kinesis Data Firehose console.

6.  Query flow logs with SQL in Athena

In this solution, Athena uses the database and table created in the Glue Data Catalog to make your flow log data queryable.  There are sample queries to review later in this article.

7.  Analyze the network flows in near real-time with Kinesis Data Analytics

Following the data through the first six steps, the solution enables you to query flow log data using SQL in Athena.  This is great for ad hoc queries, or querying data that was generated over a long period of time.  However, to get the most out of the data, you should analyze it as soon as possible after it is generated.  To accomplish this, the solution uses Kinesis Data Analytics (KDA) to analyze the flow logs and extract some immediate insights.  Kinesis Data Analytics (KDA) enables you to query streaming data using SQL so you can get immediate insights into your data.  In this solution, the KDA application uses a Lambda function to decompress the gzipped records from Kinesis Data Firehose, and then analyzes the flow log data to create some aggregations of interest.  The KDA application creates the following aggregations:

  • A count of rejected TCP packets, every 15 minutes.
  • A count of rejected TCP packets by protocol, every 15 minutes.

These metrics are aggregated over a 15-minute window.  At the end of the window, KDA invokes a Lambda function, passing the aggregated values as input to the function.

8.  Write the aggregations as custom CloudWatch metrics

At the end of the 15-minute window, KDA invokes a Lambda function, passing the aggregated values.  The Lambda function writes these values to CloudWatch as custom metrics. This enables the solution to support alarms on those metrics using CloudWatch alarms, and it enables custom dashboards to be created from the metrics.

9.  View the aggregated data in CloudWatch dashboards

CloudWatch dashboards are customizable home pages in the CloudWatch console for monitoring your resources in a single view.  You can use CloudWatch dashboards to create customized views of the metrics and alarms for your AWS resources. In this solution, we create a dashboard that monitors the custom aggregations created in our KDA application. The solution creates a sample dashboard to get you started, but you should review the metrics and create a dashboard and alarms to meet your needs.

Deploying the solution

To deploy this solution into your own account, you use the CloudFormation template to create the stack. You can deploy the solution stack into the following AWS Regions: US East (N. Virginia), US West (Oregon), and EU (Ireland).  To deploy, choose the link for the Region where you want to deploy.  The CloudFormation console for that Region opens, and the template URL is pre-populated:

Deploy the solution in:

US East (N. Virginia)

The Create Stack wizard for CloudFormation will be opened.  The template location is pre-populated.  Click Next, and you will prompted to provide values for several template parameters.

Let’s review what each parameter represents:

  • Stack name — The name for this CloudFormation stack.  You can rename it from the default, but choose a short (up to 16 characters) name, and ensure your name uses only lower-case letters.  The value you use here will be used as a prefix in the name of many of the resources created by this stack.  By providing a short name with lower-case letters, the names for those resources will pass the resource naming rules.
  • S3BucketName — The name of the S3 bucket into which the Parquet files are delivered. This name must be globally unique.
  • VPCId — The ID of the existing VPC for which flow logs are captured.

Choose Next, and accept any defaults for the remainder of the CloudFormation wizard. The stack is created in a few minutes.

Analyze the flow log data

After the stack has been deployed, it may take up to 15 minutes before data can be queried in Athena, or viewed in the CloudWatch dashboard.  Let’s look at a few sample queries you can run in Athena to learn more about the network traffic within your VPC.

Navigate to the Athena console in the Region where you deployed the stack.  In the console, choose the database named “vpc_flow_logs”.  Notice that this database contains one table, named “flow_logs.”  Run the following query to see which protocol is being rejected the most within your VPC:

select protocol, sum(packets) as rejected_packets
from flow_logs
where action = 'REJECT'
group by protocol
order by rejected_packets desc

Your results should look similar to the following example

This example shows that the value for the protocol box follows the standard defined by the Internet Assigned Numbers Authority (IANA).  So in the previous example, the top two rejected protocols are TCP and ICMP.

Here are a few additional queries to help you understand the network traffic in your VPC:

Identify the top 10 IP addresses from which packets were rejected in the past 2 weeks:

SELECT
	srcaddr,
	SUM(packets) AS rejected_packets
FROM flow_logs
WHERE start >= current_timestamp - interval '14' day
GROUP BY srcaddr
ORDER BY rejected_packets DESC
LIMIT 10;

Identify the top 10 servers that are receiving the highest number of HTTPS requests:

SELECT
	dstaddr,
	SUM(packets) AS packet_count
FROM flow_logs
WHERE dstport = '443'
GROUP BY dstaddr
ORDER BY packet_count DESC
LIMIT 10;

Now let’s look at the analysis we’re performing in real time with Kinesis Data Analytics.  By default, the solution creates a dashboard named “VPC-Flow-Log-Analysis.”  On this dashboard, we’ve created a few default widgets.  The aggregate data being generated by KDA is plotted in a few charts, as shown in the following example:

This example shows that the Rejected Packets per Protocol chart has been created to plot only a subset of all possible protocols.  Modify this widget to show the protocols that are relevant for your environment.

Next steps

The solution outlined in this blog post provides an efficient way to get started with analyzing VPC Flow Logs.  To get the most out of this solution, consider these next steps:

  • Create partitions in the Glue table to help optimize Athena query performance. The current solution creates data in S3 partitioned by Y/M/D/H, however these S3 prefixes are not automatically mapped to Glue partitions.  This means that Athena queries scan all Parquet files.  As the volume of data grows, the Athena query performance degrades.  For more information about partitioning and Athena tuning, see Top 10 Performance Tuning Tips for Amazon Athena.
  • Apply the solution to additional VPCs, or in different regions. If your account contains multiple VPCs, or if your infrastructure is deployed in multiple Regions, you must create the stack in those Regions.  If you have multiple VPCs within the same Region, you can create a new flow log for each additional VPC by using the VPC console.  Configure the flow log to deliver to the same Destination Log group that you created with the stack was initially created (CWLogGroupName parameter value in the CloudFormation template).
  • Modify the default widgets in the CloudWatch dashboard. The stack created a couple of default CloudWatch dashboards; however, you can create more to meet your needs, based on the insights you’d like to get from the flow logs in your environment.
  • Create additional queries in Athena to learn more about your network behavior.

Conclusion

Using the solution provided in this blog post, you can quickly analyze the network traffic in your VPC.  It provides both a near real-time solution, and also the capabilities to query historical data.  You can get the most out of this solution by extending it with queries and visualizations of your own to meet the needs of your system.

 


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift and Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda.


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 2

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-2/

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we discuss how to process and visualize the data by creating a serverless data lake that uses key analytics to create actionable data.

Create a serverless data lake and explore data using AWS Glue, Amazon Athena, and Amazon QuickSight

As we discussed in part 1, you can store heart rate data in an Amazon S3 bucket using Kinesis Data Streams. However, storing data in a repository is not enough. You also need to be able to catalog and store the associated metadata related to your repository so that you can extract the meaningful pieces for analytics.

For a serverless data lake, you can use AWS Glue, which is a fully managed data catalog and ETL (extract, transform, and load) service. AWS Glue simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. As you get your AWS Glue Data Catalog data partitioned and compressed for optimal performance, you can use Amazon Athena for the direct query to S3 data. You can then visualize the data using Amazon QuickSight.

The following diagram depicts the data lake that is created in this demonstration:

Amazon S3 now has the raw data stored from the Kinesis process. The first task is to prepare the Data Catalog and identify what data attributes are available to query and analyze. To do this task, you need to create a database in AWS Glue that will hold the table created by the AWS Glue crawler.

An AWS Glue crawler scans through the raw data available in an S3 bucket and creates a data table with a Data Catalog. You can add a scheduler to the crawler to run periodically and scan new data as required. For specific steps to create a database and crawler in AWS Glue, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3.

The following figure shows the summary screen for a crawler configuration in AWS Glue:

After configuring the crawler, choose Finish, and then choose Crawler in the navigation bar. Select the crawler that you created, and choose Run crawler.

The crawler process can take 20–60 seconds to initiate. It depends on the Data Catalog, and it creates a table in your database as defined during the crawler configuration.

You can choose the table name and explore the Data Catalog and table:

In the demonstration table details, our data has three attribute time stamps as value_time, the person’s ID as id, and the heart rate as colvalue. These attributes are identified and listed by the AWS Glue crawler. You can see other information such as the data format (text) and the record count (approx. 15,000 with each record size of 61 bytes).

You can use Athena to query the raw data. To access Athena directly from the AWS Glue console, choose the table, and then choose View data on the Actions menu, as shown following:

As noted, the data is currently in a JSON format and we haven’t partitioned it. This means that Athena continues to scan more data, which increases the query cost. The best practice is to always partition data and to convert the data into a columnar format like Apache Parquet or Apache ORC. This reduces the amount of data scans while running a query. Having fewer data scans means better query performance at a lower cost.

To accomplish this, AWS Glue generates an ETL script for you. You can schedule it to run periodically for your data processing, which removes the necessity for complex code writing. AWS Glue is a managed service that runs on top of a warm Apache Spark cluster that is managed by AWS. You can run your own script in AWS Glue or modify a script provided by AWS Glue that meets your requirements. For examples of how to build a custom script for your solution, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.

For detailed steps to create a job, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3. The following figure shows the final AWS Glue job configuration summary for this demonstration:

In this example configuration, we enabled the job bookmark, which helps AWS Glue maintain state information and prevents the reprocessing of old data. You only want to process new data when rerunning on a scheduled interval.

When you choose Finish, AWS Glue generates a Python script. This script processes your data and stores it in a columnar format in the destination S3 bucket specified in the job configuration.

If you choose Run Job, it takes time to complete depending on the amount of data and data processing units (DPUs) configured. By default, a job is configured with 10 DPUs, which can be increased. A single DPU provides processing capacity that consists of 4 vCPUs of compute and 16 GB of memory.

After the job is complete, inspect your destination S3 bucket, and you will find that your data is now in columnar Parquet format.

Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For information about efficiently processing partitioned datasets using AWS Glue, see the blog post Work with partitioned data in AWS Glue.

You can create triggers for your job that run the job periodically to process new data as it is transmitted to your S3 bucket. For detailed steps on how to configure a job trigger, see Triggering Jobs in AWS Glue.

The next step is to create a crawler for the Parquet data so that a table can be created. The following image shows the configuration for our Parquet crawler:

Choose Finish, and execute the crawler.

Explore your database, and you will notice that one more table was created in the Parquet format.

You can use this new table for direct queries to reduce costs and to increase the query performance of this demonstration.

Because AWS Glue is integrated with Athena, you will find in the Athena console an AWS Glue catalog already available with the table catalog. Fetch 10 rows from Athena in a new Parquet table like you did for the JSON data table in the previous steps.

As the following image shows, we fetched the first 10 rows of heartbeat data from a Parquet format table. This same Athena query scanned only 4.99 KB of data compared to 205 KB of data that was scanned in a raw format. Also, there was a significant improvement in query performance in terms of run time.

Visualize data in Amazon QuickSight

Amazon QuickSight is a data visualization service that you can use to analyze data that has been combined. For more detailed instructions, see the Amazon QuickSight User Guide.

The first step in Amazon QuickSight is to create a new Amazon Athena data source. Choose the heartbeat database created in AWS Glue, and then choose the table that was created by the AWS Glue crawler.

Choose Import to SPICE for quicker analytics. This option creates a data cache and improves graph loading. All non-database datasets must use SPICE. To learn more about SPICE, see Managing SPICE Capacity.

Choose Visualize, and wait for SPICE to import the data to the cache. You can also schedule a periodic refresh so that new data is loaded to SPICE as the data is pipelined to the S3 bucket.

When the SPICE import is complete, you can create a visual dashboard easily. The following figure shows graphs displaying the occurrence of heart rate records per device.  The first graph is a horizontally stacked bar chart, which shows the percentage of heart rate occurrence per device. In the second graph, you can visualize the heart rate count group to the heart rate device.

Conclusion

Processing streaming data at scale is relevant in every industry. Whether you process data from wearables to tackle human health issues or address predictive maintenance in manufacturing centers, AWS can help you simplify your data ingestion and analysis while keeping your overall IT expenditure manageable.

In this two-part series, you learned how to ingest streaming data from a heart rate sensor and visualize it in such a way to create actionable insights. The current state of the art available in the big data and machine learning space makes it possible to ingest terabytes and petabytes of data and extract useful and actionable information from that process.


Additional Reading

If you found this post useful, be sure to check out Work with partitioned data in AWS Glue, and 10 visualizations to try in Amazon QuickSight with sample data.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team. His passion is leveraging the cloud to transform the carrier industry. He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University. As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS. His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-1/

Sports-related minor traumatic brain injuries (mTBI) continue to incite concern among different groups in the medical, sports, and parenting community. At the recreational level, approximately 1.6–3.8 million related mTBI incidents occur in the United States every year, and in most cases, are not treated at the hospital. (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources.) The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

Although emergency facilities in North America collect data on admitted traumatic brain injuries (TBI) cases, there isn’t meaningful data on the number of unreported mTBIs among athletes. Recent studies indicate a significant rate of under-reporting of sports-related mTBI due to many factors. These factors include the simple inability of team staff to either recognize the signs and symptoms or to actually witness the impact. (See “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates” in Additional resources.)

The majority of players involved in hockey and football are not college or professional athletes. There are over 3 million youth hockey players and approximately 5 million registered participants in football. (See “Head Impact Exposure in Youth Football” in Additional resources.) These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria.

Recently, the use of instrumented sports helmets, including the Head Impact Telemetry System (HITS), has allowed for detailed recording of impacts to the head in many research trials. This practice has led to recommendations to alter contact in practices and certain helmet design parameters. (See “Head impact severity measures for evaluating mild traumatic brain injury risk exposure” in Additional resources.) However, due to the higher costs of the HITS system and complexity of the equipment, it is not a practical impact alert device for the general recreational population.

A simple, practical, and affordable system for measuring head trauma within the sports environment, subject to the absence of trained medical personnel, is required.

Given the proliferation of smartphones, we felt that this was a practical device to investigate to provide this type of monitoring.  All smartphone devices have an embedded Bluetooth communication system to receive and transmit data at various ranges.  For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

Remote monitoring typically involves collecting information from devices (for example, wearables) at the edge, integrating that information into a data lake, and generating inferences that can then be served back to the relevant stakeholders. Additionally, in some cases, compute and inference must also be done at the edge to shorten the feedback loop between data collection and response.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. We use key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we focus on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 3 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Kinesis Data Analytics averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and are refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 3 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 3

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws greengrass create-group --name heartRateGroup

To complete the setup, follow the steps in Create AWS IoT Devices in an AWS Greengrass Group.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. As such, you need to add a single subscription in the Greengrass group to facilitate this message route:

Here, your device is named Heartrate_Sensor, and the target is the IoT Cloud on the topic iot/heartrate. That means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 3 along with the AWS Greengrass core software. The IoT device uses the Discovery API, which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent. The AWS Greengrass core connectivity information should be depicted as follows:

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 3. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a shell script kinesis_client_HeartRate.sh, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

#!/bin/sh
while true
do
  deviceID=$(( ( RANDOM % 10 )  + 1 ))
  heartRate=$(jot -r 1 60 140)
  echo "$deviceID,$heartRate"
  aws kinesis put-record --stream-name <your_stream_name> --data "$deviceID,$heartRate"$'\n' --partition-key $deviceID --region us-east-1
done

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

When launching an AWS CloudFormation template, be sure to enter your email address or mobile phone number with the appropriate endpoint protocol (“Email” or “SMS”) as parameters:

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify abnormal heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Kinesis Data Analytics to perform analytics on streaming data in real time. Kinesis Data Analytics consists of three configurable components: source, real-time analytics, and destination. Refer to the AWS documentation to learn the detailed steps to configure Kinesis Data Analytics.

Kinesis Data Analytics uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

The Kinesis Data Analytics schema editor lets you edit and transform the schema if required. In the following example, we transformed the second column to Value instead of COL_Value.

The SQL code to perform the real-time analysis of the data has to be copied to the SQL Editor for real-time analytics. The following is the sample code that was used for this demonstration.

“CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   VALUEROWTIME TIMESTAMP,
                                   ID INTEGER, 
                                   COLVALUE INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM ROWTIME,
              ID,
              AVG("Value") AS HEARTRATE
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY ID, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) HAVING AVG("Value") > 120 OR AVG("Value") < 40;”

This code generates DESTINATION_SQL_STREAM. It inserts values into the stream only when the average value of the heart beat that is received from SOURCE_SQL_STREAM_001 is greater than 120 or less than 40 in the 60-second time window.

For more information about the tumbling window concept, see Tumbling Windows (Aggregations Using GROUP BY).

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the destination editor, make sure that the stream name selected is the DESTINATION_SQL_STREAM. You only want to trigger the Lambda function when anomalies in the heart rate are detected. The output format can be JSON or CSV. In this example, our Lambda function expects the data in JSON format, so we chose JSON.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

{
  "customerid": {
    "S": "3"
  },
  "deviceid": {
    "S": "7"
  },
  "email": {
    "S": "[email protected]"
  },
  "firstname": {
    "S": "John"
  },
  "lastname": {
    "S": "Smith"
  },
  "mobile": {
    "S": "19999999999"
  }
}

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Analytics application.  The node.js Lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Node.js 6.10 and choose the Create a custom role option for IAM permissions.  If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Analytics.  After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.

Finally, the AWS Lambda function sends a mobile text notification (which does not contain any sensitive information) to the athletic trainer’s mobile number retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage.  To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to GZip, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting.

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How SimilarWeb analyze hundreds of terabytes of data every month with Amazon Athena and Upsolver

Post Syndicated from Yossi Wasserman original https://aws.amazon.com/blogs/big-data/how-similarweb-analyze-hundreds-of-terabytes-of-data-every-month-with-amazon-athena-and-upsolver/

This is a guest post by Yossi Wasserman, a data collection & innovation team leader at Similar Web.

SimilarWeb, in their own words: SimilarWeb is the pioneer of market intelligence and the standard for understanding the digital world. SimilarWeb provides granular insights about any website or mobile app across all industries in every region. SimilarWeb is changing the way businesses make decisions by empowering marketers, analysts, sales teams, investors, executives and more with the insights they need to succeed in a digital world.

SimilarWeb is a market intelligence company that provides insights on what is happening across the digital world. Thousands of customers use these insights to make critical decisions on how to improve strategies in marketing, drive sales, make investments and more. The importance of the decision making that our solution empowers puts a huge emphasis on our capacity to effectively collect and utilize this information.

Specifically, the team I lead is charged with overseeing SimilarWeb’s mobile data collection. We currently process hundreds of TB of anonymous data every month.

The  data-collection process is mission-critical for SimilarWeb, because we can’t provide our customers’ insights based on a flawed or incomplete data. The data collection team needs analytics to track new types of data, partner integrations, overall performance and more with great effectiveness as quickly as possible. The earlier my team can identify and address anomalies, the better. Any tool that supports this process gives us a significant advantage.

Technical challenges of SimilarWeb mobile data collection

Hundreds of TB of data is streamed into SimilarWeb every month from different sources. The data is complex.  It containins hundreds of fields, many of which are deeply nested, in addition to some with null values. This complexity creates a technical challenge because the data must be cleaned, normalized and prepared for querying.

The first option was to use our existing on-premises Hadoop cluster, which processes all of SimilarWeb’s data in a daily batch process that takes a few hours to run. For our business-critical monitoring, a 24-hour delay is not acceptable.

We considered developing a new process using Hadoop. But that would require our team to focus away from daily operations to code, scale, and maintain extract, transform and load (ETL) jobs. Additionall, having to deal with different databases will deflect our team’s focus back to operations. Therefore, we wanted an agile solution where every team member could create new reports, investigate discrepancies, and add automated tests.

We also had a count distinct problem, which caused a compute bottleneck. The count distinct problem is the difficulty of finding the number of distinct elements in a data stream containing repeated elements. We track the number of unique visitors for billions of possible segments; for example, by device, operating system, or country. Count distinct is a non-additive aggregation, so calculating an accurate number of unique visitors usually requires many memory-intensive compute nodes.

Why we chose Amazon Athena

We chose Amazon Athena to solve these.  Athena offered us:

  1. Fast queries using SQL — our team wants to use SQL to query the data, but traditional SQL databases are hard to scale to hundreds of terabytes. Athena works for us because it runs distributed SQL queries on Amazon S3 using Apache Presto.
  2. Easy to maintain — Athena is a serverless platform that doesn’t require IT operating costs.
  3. Low cost — storing so much data in a data warehouse would be very expensive, because of the number of database nodes required to store all the data. We query only a small portion of the data, so the Athena price of $0.005 per GB scanned is appealing.

Why we chose Upsolver

As another part of our solution, we chose Upsolver. Upsolver bridges together the data lake and the analytics users who aren’t big data engineers. Its cloud data lake platform helps organizations efficiently manage a data lake. Upsolver enables a single user to control big streaming data from ingestion to management and preparation for analytics platforms like Athena, Amazon Redshift and Amazon Elasticsearch Service (Amazon ES).

We chose Upsolver for these reasons:

  1. Upsolver allows us to provide hourly statistics to Athena and S3. As a result, we can identify anomalies after 1 hour instead of 24 hours.
  2. Upsolver is easy to configure at scale by using its graphical user interface (GUI). It took us about an hour to build a pipeline from Kafka to Athena, including the number of distinct visitors for every segment we track.
  3. Upsolver’s Stream Discovery tool was very helpful when we created our tables in Athena. We saw the exact distribution of values for every field in our JSON, which helped us find the fields we wanted and make necessary corrections.
  4. Upsolver is easy to maintain. Upsolver is a serverless platform with little IT overhead.
  5. Our data remains private in our own S3 bucket. Although our data is anonymous, we didn’t want it out of our control.

Our solution

Our solution includes Athena for SQL analytics, S3 for events storage, and Upsolver for data preparation. The following diagram shows our solution end-to-end.

In the following sections, we walk through our solution step-by-step.

Step 1: Get the raw data from Kafka to S3

Our data resides on-premises in Kafka. We use Upsolver’s connector to read events from Kafka and store them on S3. We needed to do this only once.  The following example shows how we created the data source in Kafka.

Step 2: Create a reduced stream on S3

Storing hundreds of TB on S3 wasn’t necessary for the kind of analytics we needed to perform. Our full stream includes about 400 fields, although we need only 20–30 of them. We used Upsolver to create a reduced stream. This reduced stream contains some of the fields from the original stream, plus some new calculated fields we added with Upsolver. We store the output in S3.

We also knew that our Kafka topic contained events that weren’t relevant to our use case. As another part of reducing our stream, we filtered out those events using Upsolver. Although we keep the raw data for only one day, we store the reduced stream for one year. The reduced stream enables us to stay dynamic — using a one-year event source, but at a much lower cost than storing the full raw data.

Step 3: Create and manage tables in Athena

AWS Glue Data Catalog and Amazon S3 are critical when using Athena. Athena takes advantage of a concept known as external tables. This means tables’ schema definitions are managed in the AWS Glue Data Catalog and the data itself is managed in S3.

We mapped the nested JSON source files to a flat output table backed by Parquet files by using the Upsolver Create Output feature. When Upsolver is run, it creates two different types of output:

  • Flat Parquet files on S3, as shown in the following screenshot. We frequently use the aggregation option, because we track the number of distinct users for various segments on an hourly basis.

  • Four added columns for our tables, as shown in the following example. Our tables’ schema change over time as a result of schema evolution and partition management. Because we’re using daily partitions, Upsolver added the four columns to our tables. Upsolver manages all of these changes for us directly in the AWS Glue Data Catalog.

Step 4: Perform SQL analytics in Athena

Our developers use the Athena SQL console to look for anomalies in new data. Usually a spike or a drop in the number of distinct users per tracked dimension indicates an anomaly. Examples of tracked dimensions can be SDK versions, devices, or operating systems.

We also track various fields, in real-time, for production debugging. Also, in some cases, we add queries to continuous integrations jobs.

Conclusion

In this post, I discussed our main challenges when dealing with large amounts of data.  I provided you with a glimpse into our thought process for selecting Amazon Athena and Upsolver to build a performant, cost effective, and efficient solution that everyone on the team can use.  Overall, this new pipeline helped improve our efficiency and reduce the time from ingestion to insight from 24 hours to minutes.


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Work with partitioned data in AWS Glue.

 


About the Author

Yossi Wasserman is a data collection & innovation team leader at SimilarWeb. He holds a BSc in Software Engineering, and over then 10 years of experience in the field of mobile app development.

 

 

 

 

How Pagely implemented a serverless data lake in AWS to facilitate customer support analytics

Post Syndicated from Joshua Eichorn original https://aws.amazon.com/blogs/big-data/how-pagely-implemented-a-serverless-data-lake-in-aws-to-facilitate-customer-support-analytics/

Pagely is an AWS Advanced Technology Partner providing managed WordPress hosting services. Our customers continuously push us to improve visibility into usage, billing, and service performance. To better serve these customers, the service team requires an efficient way to access the logs created by the application servers.

Historically, we relied on a shell script that gathered basic statistics on demand. When processing the logs for our largest customer, it took more than 8 hours to produce one report using an unoptimized process running on an Amazon EC2 instance—sometimes crashing due to resource limitations. Instead of putting more effort into fixing a legacy process, we decided it was time to implement a proper analytics platform.

All of our customer logs are stored in Amazon S3 as compressed JSON files. We use Amazon Athena to run SQL queries directly against these logs. This approach is great because there is no need for us to prepare the data. We simply define the table and query away. Although JSON is a supported format for Amazon Athena, it is not the most efficient format for use with regards to performance and cost. JSON files must be read in their entirety, even if you are only returning one or two fields from each row of data, resulting in scanning more data than is required. Additionally, the inefficiencies of processing JSON cause longer query times.

Querying the logs of our largest customers was not ideal with Athena, as we ran into the 30-minute query timeout limit. This limit can be increased, but the query was already taking longer than we wanted.

In this post, we discuss how Pagely worked with Beyondsoft, an AWS Advanced Consulting Partner, to use ConvergDB, an open-source tool developed by Beyondsoft, to build a DevOps-centric data pipeline. This pipeline uses AWS Glue to transform application logs into optimized tables that can be queried quickly and cost effectively using Amazon Athena.

Engaging Beyondsoft

We knew that we needed to do something to make our data easily accessible to our engineers with as little overhead as possible. We wanted to get the data into a more optimal file format to reduce query times. Being a lean shop, we didn’t have the bandwidth to dive into the technologies. To bridge the gap, we engaged Beyondsoft to determine the best solution to optimize and better manage our data lake.

What is ConvergDB?

ConvergDB is open-source software for the creation and management of data lakes. Users define the structure of the source and target tables and map them to concrete cloud resources. Then ConvergDB creates all of the scripts used to build and manage the necessary resources in AWS. The scripts created by ConvergDB are deployed through the use of HashiCorp Terraform, an open-source tool for managing infrastructure as code.

With ConvergDB, we can define our data lake with metadata to drive the table-creation and ETL (extract, transform, and load) processes. The schema files are used to define tables, including field-level SQL expressions that are used to transform the incoming data as it is being loaded. These expressions are used to derive calculated fields, in addition to the fields that are used for data partitioning.

After the schema is defined, the deployment file allows us to place the tables into an ETL job that is used to manage them. The ETL job schedule is specified in the deployment file and in optional fields such as the target S3 bucket and number of AWS Glue DPUs to use at runtime.

ConvergDB is a command line binary and does not need to be installed on a server. All of the artifacts are files that can be managed using source control. The ConvergDB binary takes in all the configuration files and then outputs a Terraform configuration containing all the artifacts necessary to deploy the data lake. These artifacts can be ETL scripts, table and database definitions, and IAM policies necessary to run the jobs. They can also include Amazon SNS notification topics, and even an Amazon CloudWatch dashboard showing the volume of data processed by ConvergDB ETL jobs.

Speed bumps

No implementation goes perfectly. In the following sections, Jeremy Winters, a Beyondsoft engineer, explains the problems we ran into and how they were addressed.

Partitioning and columnar formats

Two of the key best practices for structuring data for SQL analytic queries in Amazon S3 are partitioning and columnar file structure.

Partitioning is the process of splitting data into different prefixes or folders on S3 with a naming convention that’s most suitable to efficient retrieval of data. This allows Athena to skip over data that is not relevant to the particular query that is being executed.

Apache Parquet is a columnar format popular with tools in the Hadoop ecosystem. Parquet stores the columns of the data in separate, contiguous regions in the file. Directed by metadata footers, tools like Athena read only the sections of the file that are needed to fulfill the query. This process helps eliminate a large portion of I/O and network data transfer.

Reducing I/O through partitioning and Parquet files not only increases query performance, but it dramatically reduces the cost of using Athena.

For more information about best practices for data lake optimization, see the blog post Top 10 Performance Tuning Tips for Amazon Athena.

Small files problem

A classic issue encountered with Hadoop ecosystem tools is known as the “small files problem.” Processing a large number of small files creates a lot of overhead for the system, causing job execution times to skyrocket and potentially fail. Pagely had approximately 4 TB of history across 30 million files. Of these files, 29.5 million represented only 1.2 TB of the data in S3.

To analyze this issue, we enabled S3 inventory reporting on the source data bucket. The report is delivered daily in an ORC (optimized row columnar) format. From there, it is very easy to create an Athena table to analyze the bucket contents using SQL.

We used Athena to identify S3 prefixes that were “hot spots,” that is, those having a large number of small files. We identified 14,000 prefixes with less than 1 GB of data that we could consolidate. So… 29.5 million files consolidated into 14,000 files.

The following query is a way to identify small-file hot spots. The GROUP BY expression can be suited to your data. The example shows a way of grouping by the first “folder” in the bucket.

SELECT
  -- we are looking at the first string in a / delimited path
  -- if the key is path/2017-10-10.json it will group on path_to_data
  split_part(key,'/',1) AS prefix
  
  -- calculate the total size in mb for all files in prefix
  ,SUM(size)/CAST(1024*1024 AS DOUBLE) AS mb
  
  -- count of objects in the prefix
  ,COUNT(*) AS object_count
FROM
  pagely_gateway_logs
WHERE
  -- assumes that versioning is disabled
  -- you should use the latest date after
  -- refreshing all partitions
  dt = '2018-03-28-08-00'
GROUP BY
  1
HAVING
  -- only return prefixes with a total size of less than 1 gb
  -- and a file count greater than 8
  SUM(size)/CAST(1024*1024 AS DOUBLE) < 1024
  AND
  COUNT(*) >= 8

 

The results show prefixes in your object paths that can and should be consolidated. Anything less than 1 GB with more than eight files can then be consolidated into a single object, replacing the originals.

After the hot spots are identified, the small files must be consolidated. The results in the preceding image show that files with a prefix of 14184 total 33.7 MB spread across 502 files. To reduce the overhead of this many small files, we combine all of the files into a single file. Our files use gzip compression, which allows for combining them through simple concatenation, as opposed to decompressing, concatenating the raw JSON data, and then recompressing. There are many ways to achieve this, such as the Linux cat command, which is shown in the following example:

$ ls -1

14184_file1.gz

14184_file2.gz

14184_file3.gz

$ cat *.gz > combined.gz

$ gzip -d combined.gz

You can run these commands on any gzip files, and then test that the resulting file is a valid gzip by using the -d flag, which decompresses the file for you. For some use cases, you can use the Amazon S3 Multipart Copy API, but this approach requires that your small files be at least 5 MB in size.

For the Pagely dataset, we wrote a shell script to pull down all the files with a given prefix, concatenate them into a single gzip, upload the concatenated file, validate the upload, and then delete the smaller files. This script was run using AWS Fargate containers, each of which would handle a single prefix. The details of this process would be a blog post on its own, but using a service like AWS Batch can make a job like this easier to manage. The total cost of concatenating all of the small historical files was $27.

Historical data

Daily data volumes for Pagely logs are in the tens of gigabytes per day, easily handled by the smallest AWS Glue configuration. Transforming the 4 TB of compressed (~28 TB uncompressed) historical data was a bit more challenging.

ConvergDB batches the data into smaller chunks. In the case of a very long-running historical transformation job failing, only the last batch is lost, resulting in around one hour of compute being lost. ConvergDB uses its own state-tracking mechanism to communicate the failure to the next run of the job, which cleans up any mess before trying to process the batch again. Batching is an automatic feature of the ETL job created by ConvergDB, based upon the size of the AWS Glue cluster.

Post-deployment at Pagely

Running our legacy report for a medium-size application (several gigabytes in S3) took 91 seconds. Now that our data lake is in production, the same report for a medium-size application takes 5 seconds with Athena—an 18x performance gain. Our largest dataset (~1 TB in S3) fails with our legacy process. It’s also not sufficiently performant when querying the raw JSON directly with Athena. However, the new Parquet-based tables using Athena complete the analysis in 24 seconds.

Legacy processAthena with JSONAthena with Parquet
Medium customer1 minute, 31 seconds1 minute6 seconds
Largest customer> 8 hours> 30 minutes24 seconds

 

Although these numbers are obviously important, the biggest advantage is that now we don’t have to worry about performance and cost, and the engineers can focus on solving problems. Just 15 minutes of writing queries, and the entire team now has access to new data. I was able to upgrade the legacy process with queries dispatched to Athena through the AWS SDK. This process can now run on any lightweight machine (like my laptop) while Athena does the heavy lifting.

About Pagely

Pagely, in their own words: Pagely is an AWS Advanced Technology, SaaS, and Public Sector Partner providing managed WordPress hosting. We service enterprise-level customers like BMC, UNICEF, Northwestern University, and the City of Boston, offering flexibility in our solutions and the industries best expert-only, tier-less support. Pagely uses a proprietary tech stack that accelerates WordPress sites through the use of our own ARES™ Web Application gateway, PressCACHE™ and PressCDN™ technologies, and open source tools such as Redis and NGINX.

About Beyondsoft Consulting, Inc.

Beyondsoft Consulting, Inc., in their own words: Beyondsoft Consulting, Inc. is an Amazon Web Services Advanced Consulting Partner with delivery centers across the US and Asia. Beyondsoft delivers IT solutions and services to leading technology companies and enterprises across many verticals. Our team of highly skilled professionals, coupled with our focus on customer success, truly separate us as a preferred AWS Partner for many of our clients.

Beyondsoft Amazon Partner Page

Contact: Eric Valenzuela

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Work with partitioned data in AWS Glue.

 


About the Authors

Joshua Eichorn is CTO of Pagely. An engineering leader with experience leading small and large teams. As an individual contributor, manager, and director he has done it all, from writing line one on a new application to a 6 month rewrite of a massive site. Josh loves solving new challenges and building great products.

 

 

Jeremy Winters is the creator of ConvergDB, and has been working in Business Intelligence across a variety of industries for 18 years, with the past 8 years being focused on building data lakes, data warehouses, and other applications in AWS.

 

 

 

How Goodreads offloads Amazon DynamoDB tables to Amazon S3 and queries them using Amazon Athena

Post Syndicated from Joe Feeney original https://aws.amazon.com/blogs/big-data/how-goodreads-offloads-amazon-dynamodb-tables-to-amazon-s3-and-queries-them-using-amazon-athena/

At Goodreads, we’re currently in the process of decomposing our monolithic Rails application into microservices. For the vast majority of those services, we’ve decided to use Amazon DynamoDB as the primary data store. We like DynamoDB because it gives us consistent, single-digit-millisecond performance across a variety of our storage and throughput needs.

Although DynamoDB excels at high-throughput read and write workloads, it’s not optimized to support one-time, ad hoc queries or data warehouse workloads. However, by combining AWS Data Pipeline, Amazon S3, AWS Glue, and Amazon Athena we can export our dataset from DynamoDB to S3 and use Athena to run SQL queries against that dataset.

Architecture overview

Our architecture for this process is as follows:

  • AWS Data Pipeline scans data from a DynamoDB table and writes it to S3 as JSON. It broadcasts on an Amazon SNS topic when it is finished.
  • An AWS Glue job invoked by AWS Lambda converts the JSON data into Parquet.
  • An AWS Glue crawler adds the Parquet data in S3 to the AWS Glue Data Catalog, making it available to Athena for queries.

We could have queried the data in the JSON format, thus removing the extra step of the Parquet conversion. However, we decided to make the additional effort because Parquet is space-efficient and high-performing. For larger datasets, this approach translates not only into faster queries, but also cost savings.

Major architecture components

The major components of our architecture are described following.

Data Pipeline is an orchestration service for spinning up Amazon EMR clusters and running fault-tolerant jobs using big data technology like Apache Pig, Apache Hive, or Apache Spark. Data Pipeline provides a template for exporting data from an arbitrary DynamoDB table to Amazon S3. We use a slightly modified version of the standard export template.

In this version, we add the ability to send success or failure messages on Amazon SNS. Doing this lets us use Lambda to kick off further processing outside of the Data Pipeline service.

AWS Glue is used in three different ways in this architecture:

  • A serverless Apache Spark environment runs a job that converts the JSON export from Data Pipeline into the Apache Parquet format.
  • An AWS Glue crawler automatically crawls and infers the schema of our dataset and adds it to the AWS Glue Data Catalog.
  • The AWS Glue Data Catalog is the metadata store for our dataset so we can query the data with Athena.

Athena is used after the data is in the AWS Glue Data Catalog. At this point, you can query it in Athena with ANSI SQL.

Setting up infrastructure

In this process, we use AWS CloudFormation to manage our AWS resources. We’ve split the various AWS resources across three stacks to make them more composable.

The reviews.yaml template defines an example DynamoDB table called Reviews. The common.yaml template contains IAM and S3 resources that are shared across stacks. The dynamodb-exports.yaml template defines a Data Pipeline, Lambda function, AWS Glue job, and AWS Glue crawlers.

Working with the Reviews stack

The reviews.yaml CloudFormation template contains a simple DynamoDB table definition for storing user reviews on books. We’re using a hash key and sort key structure that nests each review on a book under a particular user. This structure allows an application to check if a user has a review on a book in a simple get operation and also to list all reviews by a user.

Working with the DynamoDB table schema

The table defined in reviews.yaml is a hash key and sort key table. The User attribute is the hash key, and the Book attribute is the sort key. If you build an application on this table, you might add additional Global Secondary Indexes (GSIs) to accommodate other access patterns, for example showing the highest weighted reviews for a book.

First, you create a CloudFormation stack:

  1. Click on this this Launch Stack button:
  2. Choose Next at the bottom of the screen.
  3. On the Options screen, leave everything set to the default and choose Next at the bottom of the screen.
  4. Choose Create in the Review

Next, you create test items in Reviews table. After the ReviewsStack status is CREATE_COMPLETE, you can open up the DynamoDB console and explore the table. Let’s add a few items to the table:

  1. Open DynamoDB in the AWS Management Console.
  2. Choose Tables from the left navigation pane
  1. Choose the Reviews table, and then choose the Items

  1. Choose Create item, and in the Create item box, for Tree choose Text.

  1. Remove the existing text, and copy and paste the item following into the text box.
{
  "User": "Tristan",
  "Book": "Harry Potter and the Philosopher's Stone",
  "Rating": 5,
  "Review": "A thrilling journey through the world of Hogwarts",
  "Author": "J.K. Rowling"
}
  1. Choose Save.

Now let’s add one more item.

{
  "User": "Adeline",
  "Book": "Harry Potter and the Sorcerer's Stone",
  "Rating": 4,
  "Review": "Harry is pretty brave, but Hermione is the clear hero",
  "Author": "J.K. Rowling"
}

You can see that we’ve added a few different fields that were not specified in the table schema. Notably, these are: Rating, Review, and Author. Because DynamoDB is a NoSQL database, we can add new attributes as our application evolves. However, to aggregate against those attributes efficiently, they either need to be a part of the primary key schema at table creation or defined on an index.

The Goodreads reviews table is not dissimilar from our example table. However, we have used our maximum of five Global Secondary Indexes (GSIs) to support the access patterns that our users need the most. It’s no longer an option for us to create short-lived GSIs to answer arbitrary questions we have about our data. Even if we could, we have so much data that creating a GSI creates a few days.

Now imagine that our product team wants to run queries over the reviews data for arbitrary authors. We can’t add an additional GSI, and the access pattern isn’t required in production. However, by using the architecture described in this blog post we can unlock our dataset for our product team.

Feel free to add more items to the table, because the more data you have in the table when we export it the more interesting SQL queries you can run in Athena.

Creating the common stack

The common.yaml CloudFormation template creates a variety of IAM and EC2 permissions that Data Pipeline, Lambda, and AWS Glue use. In addition, the template creates a S3 bucket to store DynamoDB exports. The resources that need to be referenced across stacks are declared in the Outputs section.

Create the CloudFormation stack as follows:

  1. Click on this Launch Stack button:
  2. Choose Next at the bottom of the screen.
  3. On the Options screen, leave everything set to the default and choose Next at the bottom of the screen.
  4. In the Capabilities section of Review, choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create.

Creating the DynamoDB exports stack

The dynamodb-exports.yaml template is a self-contained template to create a Data Pipeline, SNS topics, Lambda trigger, AWS Glue job, and an AWS Glue crawler for any given DynamoDB table. If you have multiple DynamoDB tables you want to export, you can reuse the dynamodb-exports.yaml template and create a stack for each table.

The most interesting part of this stack is the AWS Glue job script that converts an arbitrary DynamoDB export file created by the Data Pipeline task into Parquet. It also removes DynamoDB type information from the raw JSON by using Boto3, which is available in the PySpark AWS Glue environment. The code is well-documented, so don’t hesitate to dive in here if you’re interested in how to write a custom AWS Glue job.

Create the CloudFormation stack as follows:

  1. Click on this Launch Stack button:
  2. For parameters, enter the following and then choose Next:

ExportTimeout: 1

MaxConsumedReadThroughput: 0.5

TableName: Reviews

  1. On the Options screen, leave everything set to default and then choose Next.
  2. In the Review section, scroll to the bottom and choose Create.

Watching your data flow from DynamoDB to the AWS Glue Data Catalog

The pipeline from DynamoDB to the Apache Hive catalog is fully automated. After the CloudFormation stack to export Reviews is deployed, the data pipeline begins. You can query the data in Athena soon.

Monitor the data pipeline:

  1. Open AWS Data Pipeline in the console.
  2. Choose the pipeline with the name ReviewsExport.

  1. Monitor the pipeline as it goes through the various stages from provisioning a cluster to running your job.

  1. When the status is Finished, the data is in S3.

The pipeline sends a message on the export success SNS topic. Doing so triggers Lambda to invoke the AWS Glue job to convert the JSON export into Parquet.

Let’s monitor the AWS Glue job:

  1. Open AWS Glue in the console.
  2. Choose Jobs under the ETL header in the left navigation pane.
  3. Choose the check box next to ReviewsExportToParquet to view the job’s run history and other details. At this point, Run Status is in the Running

  1. The AWS Glue job is finished when the Run status reaches the Succeeded

Next, run the AWS Glue crawler:

  1. From the AWS Glue console page, choose Crawlers on the left navigation pane.
  2. Choose the check box next to ReviewsParquetCrawler.
  3. Choose Run crawler at the top of the page.

The first time the crawler runs, it adds the reviews table to the dynamodb-exports database in the AWS Glue Data Catalog. If you accumulate more export snapshots after you run the crawler, subsequent runs of the crawler add new partitions to the table,

Inspecting the reviews table in the AWS Glue Data Catalog

Next, look at the reviews table:

  1. From the AWS Glue console page, choose Tables.

  1. Choose reviews.

The AWS Glue Data Catalog is an Apache Hive–compatible metastore that stores the schema of the dataset. It stores properties such as object count and dataset location in S3, among other data.

Taking a look at the schema, you might notice the ddb_export_timestamp column, which wasn’t originally a part of the attributes that we added to the items in DynamoDB. Under the key column, ddb_export_timestamp is marked as Partition (0).  Partition columns are just like regular columns, and when they are used in WHERE clauses in Athena they allow you to restrict the amount of data scanned. The Athena partitions documentation is a great place to start if you want to know more.

The Lambda function that invokes the Parquet conversion script provides this extra metadata. So, when the AWS Glue crawler infers the schema, the partition is given a useful name, as opposed to the default partition_N name that is given if no partition name is present.

Using Athena to query the dataset

To use Athena to query the dataset, take these steps:

  1. Open Athena on the console.
  2. If you haven’t done so already, upgrade Athena to use the Hive Catalog.
  3. For Database on the left navigation pane, choose dynamodb-exports.

  1. Under Tables, you can see reviews.
  2. Choose the ellipsis at right of reviews, and choose Preview table.

You just ran a SQL query over your DynamoDB dataset! Let’s run an aggregation to count how many reviews J.K. Rowling has. As you might recall, this access pattern isn’t well-supported by our DynamoDB table design.

SELECT COUNT(author) as num_reviews FROM "dynamodb-exports"."reviews"
WHERE author = 'J.K. Rowling';

You might see different results if you added more items, but here are the results we see in our table.

With Athena, as your data grows in size or complexity, you can pull insights out of data from DynamoDB using ANSI SQL.

Next steps

Here are a few ways that you can extend this work:

  • Modify the Data Pipeline to run the DynamoDB export every night at midnight local time to you.
  • Run the AWS Glue Crawler every day at 4 a.m. local time so you always have the latest snapshot of your data in DynamoDB.
  • Use the export success topic to trigger more complex pipelines or aggregations.
  • Combine this approach with building a data lake in S3.

Conclusion

In this post, we show you how to export data from a DynamoDB table, convert it into a more efficient format with AWS Glue, and query the data with Athena. This approach gives you a way to pull insights from your data stored in DynamoDB.


Additional Reading

If you found this post useful, be sure to check these out as well:

 


About the Author

Joe Feeney is a Software Engineer on the Amazon Author team where he leverages all of Amazon’s data to provide Authors with unique, actionable insights. He enjoys losing to his wife and kids at Mario Kart, and building and modifying guitars.

 

 

 

Orchestrate Multiple ETL Jobs Using AWS Step Functions and AWS Lambda

Post Syndicated from Moataz Anany original https://aws.amazon.com/blogs/big-data/orchestrate-multiple-etl-jobs-using-aws-step-functions-and-aws-lambda/

Extract, transform, and load (ETL) operations collectively form the backbone of any modern enterprise data lake. It transforms raw data into useful datasets and, ultimately, into actionable insight. An ETL job typically reads data from one or more data sources, applies various transformations to the data, and then writes the results to a target where data is ready for consumption. The sources and targets of an ETL job could be relational databases in Amazon Relational Database Service (Amazon RDS) or on-premises, a data warehouse such as Amazon Redshift, or object storage such as Amazon Simple Storage Service (Amazon S3) buckets. Amazon S3 as a target is especially commonplace in the context of building a data lake in AWS.

AWS offers AWS Glue, which is a service that helps author and deploy ETL jobs. AWS Glue is a fully managed extract, transform, and load service that makes it easy for customers to prepare and load their data for analytics. Other AWS Services also can be used to implement and manage ETL jobs. They include: AWS Database Migration Service (AWS DMS), Amazon EMR (using the Steps API), and even Amazon Athena.

The challenge of orchestrating an ETL workflow

How can we orchestrate an ETL workflow that involves a diverse set of ETL technologies? AWS Glue, AWS DMS, Amazon EMR, and other services support Amazon CloudWatch Events, which we could use to chain ETL jobs together. Amazon S3, the central data lake store, also supports CloudWatch Events. But relying on CloudWatch Events alone means that there’s no single visual representation of the ETL workflow. Also, tracing the overall ETL workflow’s execution status and handling error scenarios can become a challenge.

In this post, I show you how to use AWS Step Functions and AWS Lambda for orchestrating multiple ETL jobs involving a diverse set of technologies in an arbitrarily-complex ETL workflow. AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components. Each component performs a discrete function, or task, allowing you to scale and change applications quickly.

Let’s look at an example ETL workflow.

Example datasets for the ETL workflow

For our example, we’ll use two publicly available Amazon QuickSight datasets.

The first dataset is a sales pipeline dataset, which contains a list of slightly more than 20,000 sales opportunity records for a fictitious business. Each record has fields that specify:

  • A date, potentially when an opportunity was identified.
  • The salesperson’s name.
  • A market segment to which the opportunity belongs.
  • Forecasted monthly revenue.

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this example we’ll assume that they are related.

The example ETL workflow requirements

Imagine there’s a business user who needs to answer questions based on both datasets. Perhaps the user wants to explore the correlations between online user engagement metrics on the one hand, and forecasted sales revenue and opportunities generated on the other hand. The user engagement metrics include website visits, mobile users, and desktop users.

The steps in the ETL workflow are:

Process the Sales dataset (PSD). Read the Sales dataset. Group records by day, aggregating the Forecasted Monthly Revenue field. Rename fields to replace white space with underscores. Output the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Process the Marketing dataset (PMD). Read the Marketing dataset. Rename fields to replace white space with underscores. Send the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Join Marketing and Sales datasets (JMSD). Read the output of the processed Sales and Marketing datasets. Perform an inner join of both datasets on the date field. Sort in ascending order by date. Send the final joined dataset to Amazon S3, and overwrite any previous output.

So far, this ETL workflow can be implemented with AWS Glue, with the ETL jobs being chained by using job triggers. But you might have other requirements outside of AWS Glue that are part of your end-to-end data processing workflow, such as the following:

  • Both Sales and Marketing datasets are uploaded to an S3 bucket at random times in an interval of up to a week. The PSD job should start as soon as the Sales dataset file is uploaded. The PMD job should start as soon as the Marketing dataset file is uploaded. Parallel ETL jobs can start and finish anytime, but the final JMSD job can start only after all parallel ETL jobs are complete.
  • In addition to PSD and PMD jobs, the orchestration must support more parallel ETL jobs in the future that contribute to the final dataset aggregated by the JMSD job. The additional ETL jobs could be managed by AWS services, such as AWS Database Migration Service, Amazon EMR, Amazon Athena or other non-AWS services.

The data engineer takes these requirements and builds the following ETL workflow chart.

To fulfill the requirements, we need a generic ETL orchestration solution. A serverless solution is even better.

The ETL orchestration architecture and events

Let’s see how we can orchestrate an ETL workflow to fulfill the requirements using AWS Step Functions and AWS Lambda. The following diagram shows the ETL orchestration architecture and the main flow of events.

The main flow of events starts with an AWS Step Functions state machine. This state machine defines the steps in the orchestrated ETL workflow. A state machine can be triggered through Amazon CloudWatch based on a schedule, through the AWS Command Line Interface (AWS CLI), or using the various AWS SDKs in an AWS Lambda function or some other execution environment.

As the state machine execution progresses, it invokes the ETL jobs. As shown in the diagram, the invocation happens indirectly through intermediary AWS Lambda functions that you author and set up in your account. We’ll call this type of function an ETL Runner.

While the architecture in the diagram shows Amazon Athena, Amazon EMR, and AWS Glue, the accompanying code sample (aws-etl-orchestrator) includes a single ETL Runner, labeled AWS Glue Runner Function in the diagram. You can use this ETL Runner to orchestrate AWS Glue jobs. You can also follow the pattern and implement more ETL Runners to orchestrate other AWS services or non-AWS tools.

ETL Runners are invoked by activity tasks in Step Functions. Because of the way AWS Step Functions’ activity tasks work, ETL Runners need to periodically poll the AWS Step Functions state machine for tasks. The state machine responds by providing a Task object. The Task object contains inputs which enable an ETL Runner to run an ETL job.

As soon as an ETL Runner receives a task, it starts the respective ETL job. An ETL Runner maintains a state of active jobs in an Amazon DynamoDB table. Periodically, the ETL Runner checks the status of active jobs. When an active ETL job completes, the ETL Runners notifies the AWS Step Functions state machine. This allows the ETL workflow in AWS Step Functions to proceed to the next step.

An important question may come up. Why does an ETL Runner run independently from your Step Functions state machine and poll for tasks? Can’t we instead directly invoke an AWS Lambda function from the Step Functions state machine? Then can’t we have that function start and monitor an ETL job until it completes?

The answer is that AWS Lambda functions have a maximum execution duration per request of 300 seconds, or 5 minutes. For more information, see AWS Lambda Limits. ETL jobs typically take more than 5 minutes to complete. If an ETL Runner function is invoked directly, it will likely time out before the ETL job completes. Thus, we follow the long-running worker approach with activity tasks. The worker in this code sample – the ETL Runner – is an AWS Lambda function that gets triggered on a schedule using CloudWatch Events. If you want to avoid managing the polling schedule through CloudWatch Events, you can implement a polling loop in your ETL workflow’s state machine. Check the AWS Big Data blog post Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy for an example.

Finally, let’s discuss how we fulfill the requirement of waiting for Sales and Marketing datasets to arrive in an S3 bucket at random times. We implement these waits as two separate activity tasks: Wait for Sales Data and Wait for Marketing Data. A state machine halts execution when it encounters either of these activity tasks. A CloudWatch Events event handler is then configured on an Amazon S3 bucket, so that when Sales or Marketing dataset files are uploaded to the bucket, Amazon S3 invokes an AWS Lambda function. The Lambda function then signals the waiting state machine to exit the activity task corresponding to the uploaded dataset. The subsequent ETL job is then invoked by the state machine.

Set up your own ETL orchestration

The aws-etl-orchestrator GitHub repository provides source code you can customize to set up the ETL orchestration architecture in your AWS account. The following steps show what you need to do to start orchestrating your ETL jobs using the architecture shown in this post:

  1. Model the ETL orchestration workflow in AWS Step Functions
  2. Build your ETL Runners (or use an existing AWS Glue ETL Runner)
  3. Customize AWS CloudFormation templates and create stacks
  4. Invoke the ETL orchestration state machine
  5. Upload sample Sales and Marketing datasets to Amazon S3

Model the ETL orchestration workflow in AWS Step Functions.  Use AWS Step Functions to model the ETL workflow described in this post as a state machine. A state machine in Step Functions consists of a set of states and the transitions between these states. A state machine is defined in Amazon States Language, which is a JSON-based notation. For a few examples of state machine definitions, see Sample Projects.

The following snapshot from the AWS Step Functions console shows our example ETL workflow modeled as a state machine. This workflow is what we provide you in the code sample.

When you start an execution of this state machine, it will branch to run two ETL jobs in parallel: Process Sales Data (PSD) and Process Marketing Data (PMD). But, according to the requirements, both ETL jobs should not start until their respective datasets are uploaded to Amazon S3. Hence, we implement Wait activity tasks before both PSD and PMD. When a dataset file is uploaded to Amazon S3, this triggers an AWS Lambda function that notifies the state machine to exit the Wait states. When both PMD and PSD jobs are successful, the JMSD job runs to produce the final dataset.

Finally, to have this ETL workflow execute once per week, you will need to configure a state machine execution to start once per week using a CloudWatch Event.

Build your ETL Runners (or use an existing AWS Glue ETL Runner)The code sample includes an AWS Glue ETL Runner. For simplicity, we implemented the ETL workflow using only AWS Glue jobs. However, nothing prevents you from using a different ETL technology to implement PMD or PSD jobs. You’ll need to build an ETL Runner for the technology that follows the AWS Glue ETL Runner example.

Customize AWS CloudFormation templates and create stacks. The sample published in the aws-etl-orchestrator repository includes three separate AWS CloudFormation templates. We organized resources into three templates following AWS CloudFormation best practices. The three resource groups are logically distinct and likely to have separate lifecycles and ownerships. Each template has an associated AWS CloudFormation parameters file (“*-params.json” files). Parameters in those files must be customized. The details about the three AWS CloudFormation templates are as follows:

  1. A template responsible for setting up AWS Glue resources.For our example ETL workflow, the sample template creates three AWS Glue jobs: PSD, PMD, and JMSD. The scripts for these jobs are pulled by AWS CloudFormation from an Amazon S3 bucket that you own.
  2. A template where the AWS Step Functions state machine is defined.The state machine definition in Amazon States Language is embedded in a StateMachine resource within the Step Functions template.
  3. A template that sets up resources required by the ETL Runner for AWS Glue.The AWS Glue ETL Runner is a Python script that is written to be run as an AWS Lambda function.

Invoke the ETL orchestration state machine. Finally, it is time to start a new state machine execution in AWS Step Functions. For our ETL example, the AWS CloudFormation template creates a state machine named MarketingAndSalesETLOrchestrator. You can start an execution from the AWS Step Functions console, or through an AWS CLI command. When you start an execution, the state machine will immediately enter Wait for Data states, waiting for datasets to be uploaded to Amazon S3.

Upload sample Sales and Marketing datasets to Amazon S3

Upload datasets provided to the S3 bucket that you specified in the code sample configuration. This uploaded datasets signal the state machine to continue execution.

The state machine may take a while to complete execution. You can monitor progress in the AWS Step Functions console. If the execution is successful, the output shown in the following diagram appears.

Congratulations! You’ve orchestrated the example ETL workflow to a successful completion.

Handling failed ETL jobs

What if a job in the ETL workflow fails? In such a case, there are error-handling strategies available to the ETL workflow developer, from simply notifying an administrator, to fully undoing the effects of the previous jobs through compensating ETL jobs. Detecting and responding to a failed ETL job can be implemented using the AWS Step Functions’ Catch mechanism. For more information, see Handling Error Conditions Using a State Machine. In the sample state machine, errors are handled by a do-nothing Pass state.

Try it out. Stop any of the example ETL workflow’s jobs while executing through the AWS Glue console or the AWS CLI. You’ll notice the state machine transitioning to the ETL Job Failed Fallback state.

Conclusion

In this post, I showed you how to implement your ETL logic as an orchestrated workflow. I presented a serverless solution for ETL orchestration that allows you to control ETL job execution using AWS Step Functions and AWS Lambda.  You can use the concepts and the code described in this post to build arbitrarily complex ETL state machines.

For more information and to download the source code, see the aws-etl-orchestrator GitHub repository. If you have questions about this post, send them our way in the Comments section below.


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Moataz Anany is a senior solutions architect with AWS. He enjoys partnering with customers to help them leverage AWS and the cloud in creative ways. He dedicates most of his spare time to his wife and little ones. The rest is spent building and breaking things in side projects.