Tag Archives: Amazon Athena

Orchestrating an AWS Glue DataBrew job and Amazon Athena query with AWS Step Functions

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/orchestrating-an-aws-glue-databrew-job-and-amazon-athena-query-with-aws-step-functions/

As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Also, as we start building complex data engineering or data analytics pipelines, we look for a simpler orchestration mechanism with graphical user interface-based ETL (extract, transform, load) tools.

Recently, AWS announced the general availability of AWS Glue DataBrew, a new visual data preparation tool that helps you clean and normalize data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

Regarding orchestration or workflow management, AWS provides AWS Step Functions, a serverless function orchestrator that makes it easy to build a workflow by integrating different AWS services like AWS Lambda, Amazon Simple Notification Service (Amazon SNS), AWS Glue, and more. With its built-in operational controls, Step Functions manages sequencing, error handling, retry logic, and states, removing a significant operational burden from your team.

Today, we’re launching Step Functions support for DataBrew, which means you can now invoke DataBrew jobs in your Step Functions workflow to build an end-to-end ETL pipeline. Recently, Step Functions also started supporting Amazon Athena integration, which means that you can submit SQL queries to the Athena engine through a Step Functions state.

In this post, we walk through a solution where we integrate a DataBrew job for data preparation, invoke a series of Athena queries for data refresh, and integrate Amazon QuickSight for business reporting. The whole solution is orchestrated through Step Functions and is invoked through Amazon EventBridge.

Use case overview

For our use case, we use two public datasets. The first dataset is a sales pipeline dataset, which contains a list of over 20,000 sales opportunity records for a fictitious business. Each record has fields that specify the following:

  • A date, potentially when an opportunity was identified
  • The salesperson’s name
  • A market segment to which the opportunity belongs
  • Forecasted monthly revenue

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this post, we assume that they’re related.

For our use case, these sales and marketing CSV files are maintained by your organization’s Marketing team, which uploads the updated full CSV file to Amazon Simple Storage Service (Amazon S3) every month. The aggregated output data is created through a series of data preparation steps, and the business team uses the output data to create business intelligence (BI) reports.

Architecture overview

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration, DataBrew for data preparation, Athena for data analysis with standard SQL, and QuickSight for business reporting. In addition, we use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

We use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

The workflow includes the following steps:

  • Step 1 – The Marketing team uploads the full CSV file to an S3 input bucket every month.
  • Step 2 – An EventBridge rule, scheduled to run every month, triggers the Step Functions state machine.
  • Steps 3 and 4 – We receive two separate datasets (sales data and marketing data), so Step Functions triggers two parallel DataBrew jobs, which create additional year, month, and day columns from the existing date field and uses those three columns for partitioning. The jobs write the final output to our S3 output bucket.
  • Steps 5, 6, 7, 8 – After the output data is written, we can create external tables on top of it with Athena create table statements and then load partitions with MCSK REPAIR commands. After the AWS Glue Data Catalog tables are created for sales and marketing, we run an additional query through Athena, which merges these two tables by year and month to create another table with aggregated output.
  • Steps 9 and 10 – As the last step of the Step Functions workflow, we send a notification to end-users through Amazon SNS to notify them that the data refresh job completed successfully.
  • Steps 11, 12, 13 – After the aggregated table data is refreshed, business users can use QuickSight for BI reporting, which fetches data through Athena. Data analysts can also use Athena to analyze the complete refreshed dataset.

Prerequisites

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

Additionally, create the S3 input and output buckets with the required subfolders to capture the sales and marketing data, and upload the input data into their respective folders.

Creating a DataBrew project

To create a DataBrew project for the marketing data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Project name, enter a name (for this post, marketing-data-etl).
  4. For Select a dataset, select New dataset.

For Select a dataset, select New dataset.

  1. For Enter your source from S3, enter the S3 path of the marketing input CSV.

For Enter your source from S3, enter the S3 path of the marketing input CSV.

  1. Under Permissions, for Role name, choose an AWS Identity and Access Management (IAM) role that allows DataBrew to read from your Amazon S3 input location.

You can choose a role if you already created one, or create a new one. Please read here for steps to create the IAM role.

  1. After the dataset is loaded, on the Functions menu, choose Date functions.
  2. Choose YEAR.

Choose YEAR.

  1. Apply the year function on the date column to create a new column called year.

  1. Repeat these steps to create month and day columns.

Repeat these steps to create month and day columns.

For our use case, we created a few new columns that we plan to use for partitioning, but you can integrate additional transformations as needed.

  1. After you have finished applying all your transformations, choose Publish on the recipe.
  2. Provide a description of the recipe version and choose Publish.

Creating a DataBrew job

Now that our recipe is ready, we can create a job for it, which gets invoked through our Step Functions state machine.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create a job.
  3. For Job name¸ enter a name (for example, marketing-data-etl).

Your recipe is already linked to the job.

  1. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose PARQUET).
  2. For S3 location, enter your final S3 output bucket path.
  3. For Compression, choose the compression type you want to apply (for this post, we choose Snappy).
  4. Under Additional configurations, for Custom partition by column values, choose year, month, and day.
  5. For File output storage, select Replace output files for each job run.

We choose this option because our use case is to do a full refresh.

We choose this option because our use case is to do a full refresh.

  1. Under Permissions, for Role name¸ choose your IAM role.
  2. Choose Create job.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

  1. When your marketing job is ready, repeat the same steps for your sales data, using the sales data output file location as needed.

Creating a Step Functions state machine

We’re now ready to create a Step Functions state machine for the complete flow.

  1. On the Step Functions console, choose Create state machine.
  2. For Define state machine¸ select Author with code snippets.
  3. For Type, choose Standard.

For Type, choose Standard.

In the Definition section, Step Functions provides a list of service actions that you can use to automatically generate a code snippet for your state machine’s state. The following screenshot shows that we have options for Athena and DataBrew, among others.

  1. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

4. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

  1. For Job name, choose Select job name from a list and choose your DataBrew job.

The JSON snippet appears in the Preview pane.

  1. Select Wait for DataBrew job runs to complete.
  2. Choose Copy to clipboard.

Choose Copy to clipboard.

  1. Integrate the code into the final state machine JSON code:
    {
       "Comment":"Monthly Refresh of Sales Marketing Data",
       "StartAt":"Refresh Sales Marketing Data",
       "States":{
          "Refresh Sales Marketing Data":{
             "Type":"Parallel",
             "Branches":[
                {
                   "StartAt":"Sales DataBrew ETL Job",
                   "States":{
                      "Sales DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"sales-data"
                         },
                         "Next":"Drop Old Sales Table"
                      },
                      "Drop Old Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Sales Table"
                      },
                      "Create Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `sales_data_output`(`date` string, `salesperson` string, `lead_name` string, `segment` string, `region` string, `target_close` string, `forecasted_monthly_revenue` int,   `opportunity_stage` string, `weighted_revenue` int, `closed_opportunity` boolean, `active_opportunity` boolean, `latest_status_entry` boolean) PARTITIONED BY (`year` string,`month` string, `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/sales/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Sales Table Partitions"
                      },
                      "Load Sales Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                },
                {
                   "StartAt":"Marketing DataBrew ETL Job",
                   "States":{
                      "Marketing DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"marketing-data-etl"
                         },
                         "Next":"Drop Old Marketing Table"
                      },
                      "Drop Old Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Marketing Table"
                      },
                      "Create Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `marketing_data_output`(`date` string, `new_visitors_seo` int, `new_visitors_cpc` int, `new_visitors_social_media` int, `return_visitors` int, `twitter_mentions` int,   `twitter_follower_adds` int, `twitter_followers_cumulative` int, `mailing_list_adds_` int,   `mailing_list_cumulative` int, `website_pageviews` int, `website_visits` int, `website_unique_visits` int,   `mobile_uniques` int, `tablet_uniques` int, `desktop_uniques` int, `free_sign_up` int, `paid_conversion` int, `events` string) PARTITIONED BY (`year` string, `month` string, `day` string) ROW FORMAT SERDE   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/marketing/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Marketing Table Partitions"
                      },
                      "Load Marketing Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                }
             ],
             "Next":"Drop Old Summerized Table"
          },
          "Drop Old Summerized Table":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"DROP TABLE default.sales_marketing_revenue",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Create Summerized Output"
          },
          "Create Summerized Output":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Notify Users"
          },
          "Notify Users":{
             "Type":"Task",
             "Resource":"arn:aws:states:::sns:publish",
             "Parameters":{
                "Message":{
                   "Input":"Monthly sales marketing data refreshed successfully!"
                },
                "TopicArn":"arn:aws:sns:us-east-1:<account-id>:<sns-topic-name>"
             },
             "End":true
          }
       }
    }

The following diagram is the visual representation of the state machine flow. With the Step Functions parallel task type, we created two parallel job runs for the sales and marketing data. When both flows are complete, they join to create an aggregated table in Athena and send an SNS notification to the end-users.

The following diagram is the visual representation of the state machine flow.

Creating an EventBridge scheduling rule

Now let’s integrate EventBridge to schedule the invocation of our Step Functions state machine on the first day of every month.

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose Create a rule.
  3. For Name, enter a name (for example, trigger-step-funcion-rule).
  4. Under Define pattern, select Schedule.
  5. Select Cron expression.
  6. Enter 001** to specify that the job runs on the first day of every month at midnight.

  1. In the Select targets section, for Target, choose Step Functions state machine
  2. For State machine, choose your state machine.

For State machine, choose your state machine.

Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.
Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.

When the job is complete, all the steps should be green.

When the job is complete, all the steps should be green.

You also receive the notification “Monthly sales marketing data refreshed successfully!”

Running an Athena query

Let’s validate the aggregated table output in Athena by running a simple SELECT query. The following screenshot shows the output.

Let’s validate the aggregated table output in Athena by running a simple SELECT query.

Creating reports in QuickSight

Now let’s do our final step of the architecture, which is creating BI reports through QuickSight by connecting to the Athena aggregated table.

  1. On the QuickSight console, choose Athena as your data source.

On the QuickSight console, choose Athena as your data source.

  1. Select the database and table name you have in Athena.

Select the database and table name you have in Athena.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

 

If QuickSight is using SPICE storage, you need to refresh the dataset in QuickSight after you receive notification about the completion of data refresh. If the QuickSight report is running an Athena query for every request, you might see a “table not found” error when data refresh is in progress. We recommend leveraging SPICE storage to get better performance.

Conclusion

This post explains how to integrate a DataBrew job and Athena queries with Step Functions to implement a simple ETL pipeline that refreshes aggregated sales and marketing data for BI reporting.

I hope this gives you a great starting point for using this solution with your datasets and applying business rules to build a complete serverless data analytics pipeline.


About the Author

Sakti Mishra

Sakti Mishra is a Senior Data Lab Solution Architect at AWS, where he helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places.

Accessing and visualizing data from multiple data sources with Amazon Athena and Amazon QuickSight

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-data-from-multiple-data-sources-with-amazon-athena-and-amazon-quicksight/

Amazon Athena now supports federated query, a feature that allows you to query data in sources other than Amazon Simple Storage Service (Amazon S3). You can use federated queries in Athena to query the data in place or build pipelines that extract data from multiple data sources and store them in Amazon S3. With Athena Federated Query, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources. Athena queries including federated queries can be run from the Athena console, a JDBC or ODBC connection, the Athena API, the Athena CLI, the AWS SDK, or AWS Tools for Windows PowerShell.

The goal for this post is to discuss how we can use different connectors to run federated queries with complex joins across different data sources with Athena and visualize the data with Amazon QuickSight.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of the Athena query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon Redshift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source in the cloud or on premises that is accessible from Lambda.

Prerequisites

Before creating your development environment, you must have the following prerequisites:

Configuring your data source connectors

After you deploy your CloudFormation stack, follow the instructions in the post Extracting and joining data from multiple data sources with Athena Federated Query to configure various Athena data source connectors for HBase on Amazon EMR, DynamoDB, ElastiCache for Redis, and Amazon Aurora MySQL.

You can run Athena federated queries in the AmazonAthenaPreviewFunctionality workgroup created as part of the CloudFormation stack or you could run them in the primary workgroup or other workgroups as long as you’re running with Athena engine version 2. As of this writing, Athena Federated Query is generally available in the Asia Pacific (Mumbai), Asia Pacific (Tokyo), Europe (Ireland), US East (N. Virginia), US East (Ohio), US West (N. California), and US West (Oregon) Regions. If you’re running in other Regions, use the AmazonAthenaPreviewFunctionality workgroup.

For information about changing your workgroup to Athena engine version 2, see Changing Athena Engine Versions.

Configuring QuickSight

The next step is to configure QuickSight to use these connectors to query data and visualize with QuickSight.

  1. On the AWS Management Console, navigate to QuickSight.
  2. If you’re not signed up for QuickSight, you’re prompted with the option to sign up. Follow the steps to sign up to use QuickSight.
  3. After you log in to QuickSight, choose Manage QuickSight under your account.

After you log in to QuickSight, choose Manage QuickSight under your account.

  1. In the navigation pane, choose Security & permissions.
  2. Under QuickSight access to AWS services, choose Add or remove.

Under QuickSight access to AWS services, choose Add or remove.

A page appears for enabling QuickSight access to AWS services.

  1. Choose Athena.

Choose Athena.

  1. In the pop-up window, choose Next.

In the pop-up window, choose Next.

  1. On the S3 tab, select the necessary S3 buckets. For this post, I select the athena-federation-workshop-<account_id> bucket and another one that stores my Athena query results.
  2. For each bucket, also select Write permission for Athena Workgroup.

For each bucket, also select Write permission for Athena Workgroup.

  1. On the Lambda tab, select the Lambda functions corresponding to the Athena federated connectors that Athena federated queries use. If you followed the post Extracting and joining data from multiple data sources with Athena Federated Query when configuring your Athena federated connectors, you can select dynamo, hbase, mysql, and redis.

For information about registering a data source in Athena, see the appendix in this post.

  1. Choose Finish.

Choose Finish.

  1. Choose Update.
  2. On the QuickSight console, choose New analysis.
  3. Choose New dataset.
  4. For Datasets, choose Athena.
  5. For Data source name, enter Athena-federation.
  6. For Athena workgroup, choose primary.
  7. Choose Create data source. 

As stated earlier, you can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

You can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

  1. For Catalog, choose the catalog that you created for your Athena federated connector.

For information about creating and registering a data source in Athena, see the appendix in this post.

For information about creating and registering a data source in Athena, see the appendix in this post.

  1. For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

I can now see the database and tables listed in QuickSight.

  1. Choose Edit/Preview data to see the data.
  2. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

22. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

  1. To do a join with another Athena data source, choose Add data and select the catalog and table.
  2. Choose the join link between the two datasets and choose the appropriate join configuration.
  3. Choose Apply.

Choose Apply

You should be able to see the joined data.

You should be able to see the joined data.

Running a query in QuickSight

Now we use the custom SQL option in QuickSight to run a complex query with multiple Athena federated data sources.

  1. On the QuickSight console, choose New analysis.
  2. Choose New dataset.
  3. For Datasets, choose Athena.
  4. For Data source name, enter Athena-federation.
  5. For the workgroup, choose primary.
  6. Choose Create data source.
  7. Choose Use custom SQL.
  8. Enter the query for ProfitBySupplierNation.
  9. Choose Edit/Preview data.

Choose Edit/Preview data.

Under Query mode, you have the option to view your query in either SPICE or direct query. SPICE is the QuickSight Super-fast, Parallel, In-memory Calculation Engine. It’s engineered to rapidly perform advanced calculations and serve data. Using SPICE can save time and money because your analytical queries process faster, you don’t need to wait for a direct query to process, and you can reuse data stored in SPICE multiple times without incurring additional costs. You also can refresh data in SPICE on a recurring basis as needed or on demand. For more information about refresh options, see Refreshing Data.

With direct query, QuickSight doesn’t use SPICE data and sends the query every time to Athena to get the data.

  1. Select SPICE.
  2. Choose Apply.
  3. Choose Save & visualize.

Choose Save & visualize.

  1. On the Visualize page, under Fields list, choose nation and sum_profit.

QuickSight automatically chooses the best visualization type based on the selected fields. You can change the visual type based on your requirement. The following screenshot shows a pie chart for Sum_profit grouped by Nation.

The following screenshot shows a pie chart for Sum_profit grouped by Nation.

You can add more datasets using Athena federated queries and create dashboards. The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

When your analysis is ready, you can choose Share to create a dashboard and share it within your organization.

Summary

QuickSight is a powerful visualization tool, and with Athena federated queries, you can run analysis and build dashboards on various data sources like DynamoDB, HBase on Amazon EMR, and many more. You can also easily join relational, non-relational, and custom object stores in Athena queries and use them with QuickSight to create visualizations and dashboards.

For more information about Athena Federated Query, see Using Amazon Athena Federated Query and Query any data source with Amazon Athena’s new federated query.


Appendix

To register a data source in Athena, complete the following steps:

  1. On the Athena console, choose Data sources.

On the Athena console, choose Data sources.

  1. Choose Connect data source.

Choose Connect data source.

  1. Select Query a data source.
  2. For Choose a data source, select a data source (for this post, I select Redis).
  3. Choose Next.

Choose Next.

  1. For Lambda function, choose your function.

For this post, I use the redis Lambda function, which I configured as part of configuring the Athena federated connector in the post Extracting and joining data from multiple data sources with Athena Federated Query.

  1. For Catalog name, enter a name (for example, redis).

The catalog name you specify here is the one that is displayed in QuickSight when selecting Lambda functions for access.

  1. Choose Connect.

Choose Connect.

When the data source is registered, it’s available in the Data source drop-down list on the Athena console.

When the data source is registered, it’s available in the Data source drop-down list on the Athena console.


About the Author

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

 

 

 

Dream11’s journey to building their Data Highway on AWS

Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

Since inception, Dream11 has been a data-driven sports technology brand. 

Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

  • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
  • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
  • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
  • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
  • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

Design goals

Dream11 had the following design goals for the project:

  • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
  • The cost should be low and should be pay-as-you-go.
  • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
  • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
  • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
  • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
  • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
  • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
  • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
  • The system should be able to build 360-degree user profiles
  • The system should provide alerting on important changes to key business metrics.
  • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

Data Highway architecture

In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

The following diagram shows the high-level architecture.

For this project, they used the following components:

The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

Event ingestion, segregation, and organization

Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

The following diagram shows the logical structure of these events.

When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

  • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
  • Apache Druid – Provides near real-time dimensional analytics
  • Amazon Redshift – Provides session analytics

The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

  The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

 

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

Storage, cataloging, ETL, and scheduling

In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

Updating the AWS Glue Data Catalog with metadata for the target table

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

ETL with Amazon EMR Presto

Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

Apache Airflow for schedule management

Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

Real-time and near-real-time analytics

In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

Concurrency analytics with Apache Druid

Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

Finding active users with Amazon EMR HBase

Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

Session analytics with Amazon Redshift

Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

Event analytics with Athena

Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

  Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 

Conclusion

This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


About the Authors

Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

 

Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

Boosting your data lake insights using the Amazon Athena Query Federation SDK

Post Syndicated from Adir Sharabi original https://aws.amazon.com/blogs/big-data/boosting-your-data-lake-insights-using-the-amazon-athena-query-federation-sdk/

Today’s modern applications use multiple purpose-built database engines, including relational, key-value, document, and in-memory databases. This purpose-built approach improves the way applications use data by providing better performance and reducing cost. However, the approach raises some challenges for data teams that need to provide a holistic view on top of these database engines, and especially when they need to merge the data with datasets in the organization’s data lake.

In this post, we show how to use the Amazon Athena Query Federation SDK to easily enrich your data in Amazon Simple Storage Service (Amazon S3) with data from external datastores, apply complex transformations, and get predictive insights by inferencing machine learning (ML) models.

We start by running a query to enrich an S3 backed table that holds features extracted from breast cancer images with fictional patient personal information stored in Amazon DynamoDB. We then use the Athena UDF functionality to decrypt sensitive information stored in the table. Next, we select these features and use Athena integration with Amazon SageMaker to pass them to a linear learner model to predict whether breast cancer is present. Lastly, we show an Amazon QuickSight dashboard to visualize the results.

For this post, we use the following resources:

  • A dataset of features computed from a digitized image of a fine needle aspirate of a breast mass. Such features include radius, texture, perimeter, area, and smoothness. The dataset is stored as a CSV file in Amazon S3.
  • Patient’s personal information (such as age range, email, and country) stored in DynamoDB.
  • A linear learner model deployed into a SageMaker endpoint to predict breast cancer. For more information, see Call an Amazon SageMaker model endpoint using Amazon API Gateway and AWS Lambda.

Prerequisites

Athena Query Federation is now generally available in US East (Ohio), US East (N. Virginia), and US West (Oregon). To use this feature, upgrade your engine version to Athena engine version 2 in your workgroup settings. To enable this feature in other Regions, you need to create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup. Workgroups allows us to:

  • Isolate users, teams, applications, or workloads into groups
  • Enforce costs constraints per query or workgroup
  • Track query-related metrics for all workgroup queries in Amazon CloudWatch

For more information, see Managing Workgroups.

Creating a DynamoDB table and SageMaker endpoint

For the post, we create a new DynamoDB table with synthetic patient data. For the inference requests, we create a new SageMaker endpoint.

  1. Deploy the following AWS CloudFormation stack in us-east-1:

The stack performs the following:

  • Creates a DynamoDB table
  • Creates and triggers an AWS Lambda function to load the table with data
  • Creates a SageMaker endpoint for inference requests

It can take up to 10 minutes for the CloudFormation stack to create the resources.

  1. After the resource creation is complete, navigate to the AWS CloudFormation console.
  2. Choose the boost-your-datalake-insights stack
  3. On the Outputs tab, copy the values for DynamoTableName and SMEndpointName. You use these values later in the post.

Downloading the dataset

Under new or existing bucket create a folder named breast_cancer_features. Download the breast_cancer_raw_data.csv file and upload it to breast_cancer_features folder in your bucket. This file contains the Breast Cancer Wisconsin (Diagnostic) Data Set, available at https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Diagnostic%29.

The file holds 570 records with 30 columns of values computed for each cell nucleus. Such features include radius, texture, perimeter, area, smoothness, compactness, concavity, concave points, symmetry, and fractal dimension. The following screenshot displays a few records from the file.

Creating the features table in Athena

To query this dataset from Athena, you need to create an external table that points to that file. There are several ways to create table in Athena. For this post, we use a Hive data definition language (DDL) statement.

  1. On the Athena console, if using a non-GA Region, switch to the AmazonAthenaPreviewFunctionality workgroup created earlier.
  2. Enter the following statement:
    CREATE EXTERNAL TABLE breast_cancer_features(
    id string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://<bucket>/breast_cancer_features/'
    TBLPROPERTIES (
      'classification'='csv', 
      'columnsOrdered'='true', 
      'compressionType'='none', 
      'delimiter'=',', 
      'skip.header.line.count'='1')

  1. Choose Run Query or press CTRL + Enter.

After the table is created successfully, we can run queries such as the following, to profile your data and better understand how it’s distributed:

SELECT variance(radius_mean) AS variance,
        stddev(radius_mean) AS stddev ,
        min(radius_mean) AS min,
         max(radius_mean) AS max,
         avg(radius_mean) AS avg
FROM "default"."breast_cancer_features"

The following screenshot shows our results.

Joining the features table with data stored in DynamoDB

In this step, we enrich the features table by joining it with the personal information stored in DynamoDB. With Athena Query Federation, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources.

The following screenshot displays a few records from the DynamoDB table.

To join the features table stored in Amazon S3 with the personal data stored in DynamoDB, we need to create a data source connector that runs on Lambda to run the federated query. A data source connector is a piece of code that can translate between your target data source and Athena. You can think of a connector as an extension of the query engine in Athena.

Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, DynamoDB, Amazon DocumentDB (with MongoDB capability), and Amazon Relational Database Service (Amazon RDS), and JDBC-compliant relational data sources such as MySQL and PostgreSQL. You can also use the Athena Query Federation SDK to write custom connectors.

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source.

Deploying a Lambda function data source connector

To deploy the data source connector, complete the following steps

  1. On the Athena console, choose Data sources.
  2. Choose Connect data source.
  3. Choose Query a data source.
  4. For Choose a data source, choose the data source that you want to query with Athena, such as Amazon DynamoDB.
  5. Choose Next.
  6. For Lambda function, choose Configure new AWS Lambda function.

The function page for the connector that you chose opens on the Lambda console. The page includes detailed information about the connector.

  1. Under Application settings, specify the required information. At a minimum, this includes:
    • AthenaCatalogName – A name for the Lambda function that indicates the data source that it targets, such as athena_dynamodb_connector.
    • SpillBucket – An S3 bucket in your account to store data that exceeds Lambda function response size limits.

For more information about the remaining configurable options, see Available Connectors on GitHub.

  1. Select I acknowledge that this app creates custom IAM roles.
  2. Choose Deploy.

The Resources section on the Lambda console shows the deployment status of the connector and informs you when the deployment is complete.

Connecting to a data source

After you deploy the data source connector to your account, you can connect it to a data source from Athena.

  1. On the Athena console, choose Connect data source.
  2. Choose Query a data source.
  3. Choose the data source for the connector that you just deployed, such as Amazon DynamoDB. If you used the Athena Query Federation SDK to create your own connector and have deployed it to your account, choose All other data sources.
  4. Choose Next.
  5. For Choose Lambda function, choose the function that you named previously (athena_dynamodb_connector).
  6. For Catalog name, enter a unique name to use for the data source in your SQL queries, such as dynamo_db.

The name can be up to 127 characters and must be unique within your account. It can’t be changed after creation. Valid characters are a–z, A–Z, 0–9, _, @, and -. The names awsdatacatalog, hive, jmx, and system are reserved by Athena and can’t be used for custom catalog names.

  1. Choose Connect.

The Data sources page shows your connector in the list of catalog names. You can now use the connector in your queries.

Querying the external data source

To query your external data source, complete the following steps:

  1. In the Athena Query editor, for Data source, choose dynamo_db.

The DynamoDB table appears in the Tables list.

  1. Choose (three dots) and choose Preview table.

Now we can run queries like the following to enrich and get more insights on our data by joining it with additional data that was stored in DynamoDB:

SELECT age, count(*)
FROM breast_cancer_features d
JOIN "dynamo_db"."default"."<DynamoTableName>" p
ON d.id = p.patient_id
GROUP BY age

The following screenshot shows our results.

Using custom UDFs to decrypt the patient’s email

When looking at our patient’s personal information stored in the DynamoDB table, we can see that the patient’s email is encrypted. We want to allow our users to get the decrypted email while querying with Athena without needing to run custom ETL, which requires us to store it decrypted.

User Defined Functions (UDFs) in Athena allow you to create custom functions to process records or groups of records. A UDF accepts parameters, performs the work, and returns a result. However, UDFs in Athena have the following limitations:

  • Scalar UDFs only – Athena only supports scalar UDFs, which process one row at a time and return a single column value. Athena passes a batch of rows to the UDF each time it invokes a Lambda function.
  • Java runtime only – As of this writing, Athena UDFs support only the Java 8 runtime for Lambda.

To use this feature in preview, you must create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup, as specified in prerequisites section. For more information, see Querying with User Defined Functions.

Deploying a custom UDF

In this post, we use the decrypt method from the example UDF we published. The same encryption key that we used for the encryption should also be used for the decryption. We store that string in AWS Secrets Manager and use the Athena Query Federation SDKs to retrieve the stored key when the function is called.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Select secret type, select Other type of secrets.
  3. Choose Plaintext.
  4. Remove all the JSON brackets and enter a base64 encoded string as data key, such as AQIDBAUGBwgJAAECAwQFBg==
  5. For Select the encryption key, choose DefaultEncryptionKey.
  6. Choose Next.

  1. Enter athena_encrypt_udf_key as the secret name.
  2. Choose Next.
  3. Chose Next again.
  4. Chose Store.

Next, we deploy the Lambda function that runs the UDF.

  1. On the AWS Serverless Application Repository console, in the navigation pane, choose Available applications.
  2. Select Show apps that create custom IAM roles or resource policies.
  3. In the search box, enter AthenaUserDefinedFunctions.
  4. Choose the application from the result pane.

The Lambda function’s Application details page opens on the Lambda console.

  1. For SecretNameOrPrefix, enter the name or prefix of a set of names within Secrets Manager that this function should have access to, such as athena_encrypt_udf_key* (make sure to include an asterisk at the end).
  2. For LambdaFunctionName, enter the name of the Lambda function that runs your UDFs, such as athena_udf.
  3. Select I acknowledge that this app creates custom IAM roles.
  4. Choose Deploy.

The Resources section of the Lambda console shows the deployment status of the connector and informs you when the deployment is complete.

Querying with UDFs

The USING FUNCTION clause specifies a UDF or multiple UDFs that can be referenced by a subsequent SELECT statement in the query. You need the method name for the UDF and the name of the Lambda function that hosts the UDF.

In our example, the decrypt method gets two parameters: encrypted_col and secretName. We define the function and run the following query:

USING FUNCTION decrypt(encrypted_col VARCHAR, secretName VARCHAR)
RETURNS VARCHAR
TYPE LAMBDA_INVOKE WITH (lambda_name ='athena_udf')

SELECT encrypted_email, 
    decrypt(encrypted_email,'athena_encrypt_udf_key') AS decrypted_email
FROM "dynamo_db"."default"."<DynamoTableName>"

The following screenshot shows the query results.

Using Athena ML to predict breast cancer

To get predictive insights from our data using ML models, we need to write a code to enable inference against a deployed model. Data analysts are often skilled in using SQL, but may not have expertise in programming. ML with Athena bridges this gap and lets you run inference on models deployed on SageMaker by writing SQL statements in Athena. This feature simplifies access for data analysis to ML models such as anomaly detection, customer cohort analysis, and sales predictions, and improves productivity. This eliminates the need to use complex programming methods or offload data and jobs orchestration to run inference. 

SageMaker is a fully managed ML service, where data scientists and developers can quickly and easily build and train ML models. In addition, SageMaker allows you to deploy your model to get predictions in one of two ways:

Running a SQL statement with SageMaker inference to predict breast cancer

To perform inference from Athena, you need to train the model based on historical labeled data and deploy it into the SageMaker hosting service. In this post, we use a linear learner model to predict breast cancer out of features we used before. The model was already deployed as part of the CloudFormation stack as you can see in the screenshot below from the SageMaker console.

To use ML with Athena, you define a function with the USING FUNCTION clause. The function points to the SageMaker model endpoint that you want to use and specifies the variable names and data types to pass to the model. Subsequent clauses in the query reference the function to pass values to the model. The model runs inference based on the values that the query passes and then returns inference results.

In our example, the model endpoint is SMEndpoint-odPIqmf9LjUh and the variables are the feature columns from the breast_cancer_features table. We define the function and run the following query:

USING FUNCTION predict_breast_cancer(radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) 
RETURNS DOUBLE 
TYPE SAGEMAKER_INVOKE_ENDPOINT WITH (sagemaker_endpoint = '<SMEndpointName>')

SELECT id, prediction, CASE WHEN round(prediction)=1 THEN 'B' ELSE 'M' END AS diagnosis
FROM( SELECT id, predict_breast_cancer(radius_mean, texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave_points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave_points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave_points_worst, symmetry_worst, fractal_dimension_worst) AS prediction FROM breast_cancer_features)a

In the following query results, you can see the prediction column returned by the model. The diagnosis column that derives from it is either B for benign or M for malignant.

Visualizing the data using QuickSight

Because many decision-makers aren’t familiar with SQL syntax, they want to consume the data in more graphic way, such as through charts and dashboards. Visualizing the data is also required by data scientists to identify trends and anomalies, and understand how our data behaves.

Before we visualize our data with QuickSight, in order to get the best performances, we create a new dataset in Amazon S3 that holds all the patient’s personal information joined with the breast cancer diagnostic we got from the ML model inference. We use Athena’s CREATE TABLE AS SELECT (CTAS) statement to create the table and populate it with the joint data:

CREATE TABLE breast_cancer_final
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://<bucket>/breast_cancer_final/')
AS
USING FUNCTION predict_breast_cancer(radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) 
RETURNS DOUBLE 
TYPE SAGEMAKER_INVOKE_ENDPOINT WITH (sagemaker_endpoint = '<SMEndpointName>')

SELECT *, CASE WHEN round(prediction)=1 THEN 'B' ELSE 'M' END AS diagnosis
FROM( SELECT id, p.*, predict_breast_cancer(radius_mean, texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave_points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave_points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave_points_worst, symmetry_worst, fractal_dimension_worst) AS prediction FROM breast_cancer_features d JOIN "dynamo_db"."default"."<DynamoTableName>" p
ON d.id = p.patient_id)a

When the query successfully finishes, we can start building our dashboards using QuickSight.

First, we create a dataset in QuickSight for our table in Athena. For instructions, see Creating a Dataset Using Amazon Athena Data.

Next, we create QuickSight visuals.

The following dashboard shows the distribution of age range using a pie visual, a patient count by diagnostic per week, the top five countries, and patient distribution over time with forecast enabled.

Clean up

Now to the final step, cleaning up the resources.

To avoid unnecessary charges on your AWS account, do the following:

  1. Destroy all of the resources created by the CloudFormation stack in create a DynamoDB table and SageMaker endpoint set up by deleting the stack after you’re done experimenting with it. You can follow the steps here to delete the stack.
  2. You have to manually delete the S3 bucket you created with the data uploaded and generated with Athena.

Conclusions

This post demonstrated how the Athena Query Federation SDK allows you to implement serverless ETL to get more out of your data in your Amazon S3 data lake. We showed how simple Athena SQL syntax allows you to enrich your data with an external datastore, perform business logic using custom UDFs, and get insights by running ML inference. We also visualized our enriched dataset by creating a dashboard in QuickSight.

If you have feedback about this post, please share it in the comments. If you have questions about implementing the solution used in this post, comment or open a thread on the Developer Tools forum.


About the Authors

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

Eitan Sela is a Solutions Architect with Amazon Web Services. He works with AWS customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS. Eitan also helps customers build and operate machine learning solutions on AWS. In his spare time, Eitan enjoys jogging and reading the latest machine learning articles.

Keeping your data lake clean and compliant with Amazon Athena

Post Syndicated from David Roberts original https://aws.amazon.com/blogs/big-data/keeping-your-data-lake-clean-and-compliant-with-amazon-athena/

With the introduction of CTAS support for Amazon Athena (see Use CTAS statements with Amazon Athena to reduce cost and improve performance), you can not only query but also create tables using Athena with the associated data objects stored in Amazon Simple Storage Service (Amazon S3). These tables are often temporary in nature and used to filter or aggregate data that already exists in another Athena table. Although this offers great flexibility to perform exploratory analytics, when tables are dropped, the underlying Amazon S3 data remains indefinitely. Over time, the accumulation of these objects can increase Amazon S3 costs, become administratively challenging to manage, and may inadvertently preserve data that should have been deleted for privacy or compliance reasons. Furthermore, the AWS Glue table entry is purged so there is no convenient way to trace back which Amazon S3 path was mapped to a deleted table.

This post shows how you can automate  deleting Amazon S3 objects associated with a table  after dropping it using Athena. AWS Glue is required to be the metadata store for Athena.

Overview of solution

The solution requires that the AWS Glue table record (database, table, Amazon S3 path) history is preserved outside of AWS Glue, because it’s removed immediately  after a table is dropped. Without this record, you can’t delete the associated Amazon S3 object entries after the fact.

When Athena CTAS statements  are issued, AWS Glue generates Amazon CloudWatch events that specify the database and table names. These events are available from Amazon EventBridge and can be used to trigger an AWS Lambda function (autoCleanS3) to fetch the new or updated Amazon S3 path from AWS Glue and write the database, table, and Amazon S3 path into an AWS Glue history table stored in Amazon DynamoDB (GlueHistoryDDB). When Athena drop table queries are detected, CloudWatch events are generated that trigger autoCleanS3 to look up the Amazon S3 path from GlueHistoryDDB and delete all related objects from Amazon S3.

Not all dropped tables should trigger Amazon S3 object deletion. For example, when you create a table using existing Amazon S3 data (not CTAS), it’s not advisable to automatically delete the associated Amazon S3 tables, because other analysts may have other tables referring to the same source data. For this reason, you must include a user-defined comment (--dropstore ) in the Athena drop table query to cause autoCleanS3 to purge the Amazon S3 objects.

Lastly, after objects are successfully deleted, the corresponding entry in GlueHistoryDDB  is updated for historical and audit purposes. The overall workflow is described in the following diagram.

The workflow contains the following steps:

  1. A user creates a table either via Athena or the AWS Glue console or API.
  2. AWS Glue generates a CloudWatch event, and an EventBridge rule triggers the Lambda function.
  3. The function creates an entry in DynamoDB containing a copy of the AWS Glue record and Amazon S3 path.
  4. The user drops the table from Athena, including the special comment --dropstore.
  5. The Lambda function fetches the dropped table entry from DynamoDB, including the Amazon S3 path.
  6. The function deletes data from the Amazon S3 path, including manifest files, and marks the DynamoDB entry as purged.

Walkthrough overview

To implement this solution, we complete the following steps:

  1. Create the required AWS Identity and Access Management (IAM) policy and role.
  2. Create the AWS Glue history DynamoDB table.
  3. Create the Lambda autoCleanS3 function.
  4. Create the EventBridge rules.
  5. Test the solution.

If you prefer to use a preconfigured CloudFormation template, launch one of the following stacks depending on your Region.

RegionLaunch Button
us-east-1 (N. Virginia)
us-west-2 (Oregon)
eu-west-1 (Ireland)

Prerequisites

Before implementing this solution, create an AWS Glue database and table with the data residing in  Amazon S3. Be sure your user has the necessary permissions to access Athena and  perform CTAS operations writing  in a sample Amazon S3 location.

For more information about building a data lake, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

Creating an IAM policy and role

You need to first create the required IAM policy for the Lambda function role to use to query AWS Glue and write to DynamoDB.

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the following code (update the Region, account ID, and S3 bucket accordingly, and the table name GlueHistoryDDB if you choose to change it):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListObjectsV2",
                    "s3:DeleteObjectVersion",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<athena-query-results-s3-bucket>",
                    "arn:aws:s3:::<athena-query-results-s3-bucket>/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "dynamodb:PutItem",
                    "dynamodb:Scan",
                    "dynamodb:Query",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:<region>:<accountId>:table/GlueHistoryDDB"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<region>:<accountId>:catalog",
                    "arn:aws:glue:<region>:<accountId>:database/*",
                    "arn:aws:glue:<region>:<accountId>:table/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogGroup"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:log-group:/aws/lambda/autoCleanS3:*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  1. Choose Review policy.
  2. For Name, enter autoCleanS3-LambdaPolicy.
  3. For Description, enter Policy used by Lambda role to purge S3 objects when an Amazon Athena table is dropped.
  4. Choose Create policy.

Next, you need to create an IAM role and attach this policy.

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. Choose AWS service.
  4. Choose Lambda.
  5. Choose Next: Permissions.

  1. For Filter policies, enter autoCleanS3-LambdaPolicy.
  2. Choose Next: Tags.
  3. Choose Next: Review.
  4. For Role name, enter autoCleanS3-LambdaRole.
  5. For Description, enter Role used by Lambda to purge S3 objects when an Amazon Athena table is dropped.
  6. Choose Create role.

Creating the AWS Glue history DynamoDB table

You use this DynamoDB table to hold the current and historical list of AWS Glue tables and their corresponding Amazon S3 path. Create the table as follows:

  1. On the DynamoDB console, choose Dashboard.
  2. Choose Create table.
  3. For Table name, enter GlueHistoryDDB.
  4. For Partition key, enter database (leave type as String).
  5. Select Add sort key.
  6. Enter table_date (leave type as String).
  7. For Table settings, select Use default settings.
  8. Choose Create.

The following table summarizes the GlueHistoryDDB table attributes that the Lambda function creates.

ColumnTypeDescription
databasepartition keyThe name of the AWS Glue database.
table_datesort keyA composite attribute of AWS Glue table name plus date created. Because the same database and table name can be created again, the date must be used to ensure uniqueness.
created_byattributeThe user or Amazon EC2 instance ARN from which the table was created.
ownerattributeThe owner of the table or account number.
purgedattributeA boolean indicating whether the Amazon S3 objects have been deleted (True/False).
s3_pathattributeThe Amazon S3 path containing objects associated with the table.
tableattributeThe AWS Glue table name.
update_timeattributeThe last time the table was updated (the Amazon S3 path changed or objects purged).
view_sqlattributeThe view DDL if a view was created.

Creating the Lambda function autoCleanS3

A CloudWatch event triggers the Lambda function autoCleanS3 when a new table is created, updated, or dropped. If the --dropstore keyword is included in the Athena query comments, the associated Amazon S3 objects are also removed.

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch.
  3. For Function name¸ enter autoCleanS3.
  4. For Runtime, choose Python 3.8.
  5. Under Permissions, for Execution role, select Use an existing role.
  6. Choose the role you created (service-role/autoCleanS3-LambdaRole).
  7. Choose Create function.
  8. Scroll down to the Function code section.
  9. If using Region us-west-2, on the Actions menu, choose Upload a file to Amazon S3.

  1. Enter the following:
    https://aws-bigdata-blog.s3.amazonaws.com/artifacts/aws-blog-keep-your-data-lake-clean-and-compliant-with-amazon-athena/autoCleanS3.zip

  2. Choose Save.

If using a Region other than us-west-2, download the Lambda .zip file locally. Then choose Upload a .zip file and choose the file from your computer to upload the Lambda function.

  1. In the Environment variables section, choose Edit.
  2. Choose Add environment variable.
  3. Enter the following key-values in the following table (customize as desired):
KeyValuePurpose
Athena_SQL_Drop_Phrase--dropstoreString to embed in Athena drop table queries to cause associated Amazon S3 objects to be removed
db_list

Comma-separated regex filter

<.*>

Allows you to limit which databases may contain tables that autoCleanS3 is allowed to purge
ddb_history_tableGlueHistoryDDBThe name of the AWS Glue history DynamoDB table
disable_s3_cleanupFalseIf set to True, it disables the Amazon S3 purge, still recording attempts in the history table
log_levelINFOSet to DEBUG to troubleshoot if needed

You must use a standard regex expression, which can be a simple comma-separated list of the AWS Glue databases that you want autoCleanS3 to evaluate.

 The following table shows example patterns for db_list.

Example Regex PatternResult
.*Default, includes all databases
clickstream_web, orders_web, defaultIncludes only clickstream_web, orders_web, default
.*_webIncludes all databases having names ending in _web
.*stream.*Includes all databases containing stream in their name

For a complete list or supported patterns, see https://docs.python.org/3/library/re.html#re.Pattern.match

  1. Choose Save.

Creating EventBridge rules

You need to create EventBridge rules that invoke your Lambda function whenever Athena query events and AWS Glue CreateTable and UpdateTable events are generated.

Creating the Athena event rule

To create the Athena query event rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-AthenaQueryEvent.
  3. For Description, enter Amazon Athena event for any query to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"AWS API Call via CloudTrail"
    	],
    	"source": [
    		"aws.athena"
    	],
    	"detail": {
    		"eventName": [
    			"StartQueryExecution"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

Creating the AWS Glue event rule

To create the AWS Glue table event  rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-GlueTableEvent.
  3. For Description, enter AWS Glue event for any table creation or update to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"Glue Data Catalog Database State Change"
    	],
    	"source": [
    		"aws.glue"
    	],
    	"detail": {
    		"typeOfChange": [
    			"CreateTable",
    			"UpdateTable"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

You’re finished!

Testing the solution

Make sure you already have a data lake with tables defined in your AWS Glue Data Catalog and permission to access Athena. For this post, we use NYC taxi ride data. For more information, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

  1. Create a new table using Athena CTAS.

Next, verify that the entry appears in the new GlueHistoryDDB table.

  1. On the DynamoDB console, open the GlueHistoryDDB table.
  2. Choose Items.
  3. Confirm the s3_path value for the table.

You can also view  the Amazon S3 table path and objects associated with the table.

  1. On the Amazon S3 console, navigate to the s3_path found in GlueHistoryDDB.
  2. Confirm the table and path containing the data folder and associated manifest and metadata objects.

  1. Drop the table using the keyword --dropstore.

  1. Check the Amazon S3 path to verify both the table folder and associated manifest and metadata files have been removed.

You can also see the purged attribute for the entry in GlueHistoryDDB is now set to True, and update_time has been updated, which you can use if you ever need to look back and understand when a purge event occurred.

Considerations

The Lambda timeout may need to be increased for very large tables, because the object deletion operations may not complete in time.

To prevent accidental data deletion, it’s recommended to carefully limit which databases may participate (Lambda environment variable db_list) and to enable versioning on the Athena bucket path and set up Amazon S3 lifecycle policies to eventually remove older versions. For more information, see Deleting object versions. 

Conclusion

In this post, we demonstrated how to automate the process of  deleting Amazon S3 objects associated with dropped AWS Glue tables. Deleting Amazon S3 objects that are no longer associated with an AWS Glue table reduces ongoing storage expense, management overhead, and unnecessary exposure of potentially private data no longer needed within the organization, allowing you to meet regulatory requirements.

This serverless solution monitors Athena and AWS Glue table creation and drop events via CloudWatch, and triggers Lambda to perform Amazon S3 object deletion. We use DynamoDB to store the audit history of all AWS Glue tables that have been dropped over time. It’s strongly recommended to enable Amazon S3 bucket versioning to prevent accidental data deletion.

To restore the Amazon S3 objects for the deleted table, you first identify the s3_path value for the relevant table entry in GlueHistoryDDB and either copy or remove the delete marker from objects in that path. For more information, see How do I undelete a deleted S3 object?


About the Author

David Roberts is a Senior Solutions Architect at AWS. His passion is building efficient and effective solutions on the cloud, especially involving analytics and data lake governance. Besides spending time with his wife and two daughters, he likes drumming and watching movies, and is an avid video gamer.

Auditing, inspecting, and visualizing Amazon Athena usage and cost

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/auditing-inspecting-and-visualizing-amazon-athena-usage-and-cost/

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon Simple Storage Service (Amazon S3) using standard SQL. It’s a serverless platform with no need to set up or manage infrastructure. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets and complex queries. You pay only for the queries that you run and the charges are based on the amount of data scanned by each query. You’re not charged for data definition language (DDL) statements like CREATE, ALTER, or DROP TABLE, statements for managing partitions, or failed queries. Cancelled queries are charged based on the amount of data scanned.

Typically, multiple users within an organization operate under different Athena workgroups and query various datasets in Amazon S3. In such cases, it’s beneficial to monitor Athena usage to ensure cost-effectiveness, avoid any performance bottlenecks, and adhere to security best practices. As a result, it’s desirable to have metrics that provide the following details:

  • Amount of data scanned by individual users
  • Amount of data scanned by different workgroups
  • Repetitive queries run by individual users
  • Slow-running queries
  • Most expensive queries

In this post, we provide a solution that collects detailed statistics of every query run in Athena and stores it in your data lake for auditing. We also demonstrate how to visualize audit data collected for a few key metrics (data scanned per workgroup and data scanned per user) using Amazon QuickSight.

Solution overview

The following diagram illustrates our solution architecture.

The solution consists of the following high-level steps:

  1. We use an Amazon CloudWatch Events rule with the following event pattern to trigger an AWS Lambda function for every Athena query run:
    {
    "detail-type": [
    "Athena Query State Change"
    ],
    "source": [
    "aws.athena"
    ],
    "detail": {
    "currentState": [
    "SUCCEEDED",
    "FAILED",
    "CANCELED"
    ]
    }
    }

  1. The Lambda function queries the Athena API to get details about the query and publishes them to Amazon Kinesis Data Firehose.
  2. Kinesis Data Firehose batches the data and writes it to Amazon S3.
  3. We use a second CloudWatch event rule with the following event pattern to trigger another Lambda function for every Athena query being run:
    {
    "detail-type": [
    "AWS API Call via CloudTrail"
    ],
    "source": [
    "aws.athena"
    ],
    "detail": {
    "eventName": [
    "StartQueryExecution"
    ]
    }
    }

  1. The Lambda function extracts AWS Identity and Access Management (IAM) user details from the query and publishes them to Kinesis Data Firehose.
  2. Kinesis Data Firehose batches the data and writes it to Amazon S3.
  3. We create and schedule an AWS Glue crawler to crawl the data written by Kinesis Data Firehose in the previous steps to create and update the Data Catalog tables. The following code is the table schemas the crawler creates:
    CREATE EXTERNAL TABLE `athena_query_details`(
      `query_execution_id` string, 
      `query` string, 
      `workgroup` string, 
      `state` string, 
      `data_scanned_bytes` bigint, 
      `execution_time_millis` int, 
      `planning_time_millis` int, 
      `queryqueue_time_millis` int, 
      `service_processing_time_millis` int)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://<S3location>/athena-query-details/'
    TBLPROPERTIES (
      'parquet.compression'='SNAPPY')
    
    CREATE EXTERNAL TABLE `athena_user_access_details`(
      `query_execution_id` string, 
      `account` string, 
      `region` string, 
      `user_detail` string, 
      `source_ip` string, 
      `event_time` string)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://<S3location>/athena-user-access-details/'
    TBLPROPERTIES (
      'parquet.compression'='SNAPPY')

  1. Athena stores a detailed history of the queries run in the last 30 days.
  2. The solution deploys a Lambda function that queries AWS CloudTrail to get list of Athena queries run in the last 30 days and publishes the details to the Kinesis Data Firehose streams created in the previous steps. The historical event processor function only needs to run one time after deploying the solution using the provided AWS CloudFormation
  3. We use the data available in the tables athena_query_details and athena_user_access_details in QuickSight to build insights and create visual dashboards.

Setting up your resources

Click on the Launch Stack button to deploy the solution in your account.

After you deploy the CloudFormation template, manually invoke the athena_historicalquery_events_processor Lambda function. This step processes the Athena queries that ran before deploying this solution; you only need to perform this step one time. 

Reporting using QuickSight dashboards

In this section, we cover the steps to create QuickSight dashboards based on the athena_query_details and athena_user_access_details Athena tables. The first step is to create a dataset with the athena_audit_db database, which the CloudFormation template created.

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset.

  1. For Create a Data Set, choose Athena.

 

  1. For Data source name, enter a name (for example, Athena_Audit).
  2. For Athena workgroup, keep at its default [primary].
  3. Choose Create data source.

  1. In the Choose your table section, choose athena_audit_db.
  2. Choose Use custom SQL.

  1. Enter a name for your custom SQL (for example, Custom-SQL-Athena-Audit).
  2. Enter the following SQL query:
    SELECT a.query_execution_id, a.query, a.state, a.data_scanned_bytes, a.execution_time_millis, b.user_detail
    FROM "athena_audit_db"."athena_query_details" AS a FULL JOIN  "athena_audit_db"."athena_user_access_details" AS b
    ON a.query_execution_id=b.query_execution_id
    WHERE CAST(a.data_scanned_bytes AS DECIMAL) > 0

  1. Choose Confirm query.

This query does a full join between the athena_query_details and athena_user_access_details tables.

  1. Select Import to SPICE for quicker analytics.
  2. Choose Edit/Preview data.

  1. Confirm that the data_scanned_bytes and execution_time_millis column data type is set to Decimal.
  2. Choose Save & visualize.

We’re now ready to create visuals in QuickSight. For this post, we create the following visuals:

  • Data scanned per query
  • Data scanned per user 

Data scanned per query

To configure the chart for our first visual, complete the following steps:

  1. For Visual types, choose Horizontal bar chart.
  2. For Y axis¸ choose query_execution_id.
  3. For Value, chose data_scanned_bytes.
  4. For Group/Color, choose query.

If you’re interested in determining the total runtime of the queries, you can use execution_time_milis instead of data_scanned_bytes.

The following screenshot shows our visualization.

 

If you hover over one of the rows in the chart, the pop-out detail shows the Athena query execution ID, Athena query that ran, and the data scanned by the query in bytes.

Data scanned per user

To configure the chart for our second visual, complete the following steps:

  1. For Visual types, choose Horizontal bar chart.
  2. For Y axis, choose query_execution_id
  3. For Value, choose data_scanned_bytes.
  4. For Group/Color, choose user_details.

You can use execution_time_millis instead of data_scanned_bytes if you’re interested in determining the total runtime of the queries.

The following screenshot shows our visualization.

If you hover over one of the rows in the chart, the pop-out detail shows the Athena query execution ID, the IAM principal that ran the query, and the data scanned by the query in bytes. 

Conclusion

This solution queries CloudTrail for Athena query activity in the last 30 days and uses CloudWatch rules to capture statistics for future Athena queries. Furthermore, the solution uses the GetQueryExecution API to provide details about the amount of data scanned, which provides information about per-query costs and how many queries are run per user. This enables you to further understand how your organization is using Athena.

You can further improve upon this architecture by using a partitioning strategy. Instead of writing all query metrics as .csv files to one S3 bucket folder, you can partition the data using year, month, and day partition keys. This allows you to query data by year, month, or day. For more information about partitioning strategies, see Partitioning Data.

For more information about improving Athena performance, see Top 10 Performance Tuning Tips for Amazon Athena.


About the Authors

Kalyan Janaki is Senior Technical Account Manager with AWS. Kalyan enjoys working with customers and helping them migrate their workloads to the cloud. In his spare time, he tries to keep up with his 2-year-old.

 

 

 

Kapil Shardha is an AWS Solutions Architect and supports enterprise customers with their AWS adoption. He has background in infrastructure automation and DevOps.

 

 

 

Aravind Singirikonda is a Solutions Architect at Amazon Web Services. He works with AWS Enterprise customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS.

Managing COVID-19 exposure with crowd tracing

Post Syndicated from Aspire Ventures original https://aws.amazon.com/blogs/big-data/managing-covid-19-exposure-with-crowd-tracing/

This is a guest blog post by AWS partner Aspire Ventures

As we enter winter, with fewer options to be outdoors, our personal choices can impact our risk of contracting the COVID-19 virus even more. The New England Journal of Medicine publication showed real-world examples of the effectiveness of masks and social distancing in mitigating severity of COVID-19 infection and keeping people asymptomatic. CNN reported on a study that showed people who contracted COVID-19 were twice as likely to have visited a restaurant in the prior two weeks. What if we had actionable crowding, mask usage, and social distancing data that we could analyze to inform our daily decisions to keep us safe?

Aspire Ventures, an AWS partner, has developed the Clio GO pass system — a new venue-entry system that helps track COVID-19 exposure through kiosks and mobile phones in a completely privacy-preserving way. It uses a new technology called crowd tracing, which allows users to assess whether certain locations and venues meet their risk profile. Crowd tracing data is COVID-19 location-scouting data, which helps answer the question of how much risk may be associated with entering a particular crowd.

Today, Aspire Ventures is collaborating with AWS to open source anonymized crowd tracing data from the Clio GO pass system and make it available in the public AWS COVID-19 data lake. Aspire Ventures is a venture fund dedicated to fast-tracking precision medicine technologies and practices that leverage AI and IoT to deliver affordable, individualized solutions at a massive scale. The AWS COVID-19 data lake is a public repository of up-to-date and curated datasets on or related to COVID-19 to help experts track, contain, and neutralize the virus causing the illness. With the Clio GO pass system and the open-source crowd tracing dataset in the AWS COVID-19 data lake, we believe the global community can come together and develop techniques to better fight the COVID-19 pandemic.

The Clio GO app functions like an airline mobile boarding pass system. Prior to arrival, you check in via the app by answering a few questions and receive a mobile entry GO Pass in either QR-code or NFC ticket format. When you arrive at a venue, you validate your GO Pass by scanning it at a kiosk or smartphone. GO Pass is being used by thousands of venues who have tens of millions of annual visitors. These venues run the spectrum from schools and medical practices to office buildings and food manufacturing facilities. Further adoption of Aspire’s Clio GO app will generate more anonymized data that AWS will make available for advancing COVID-19 solutions.

Crowd tracing vs. contact tracing

As Dr. Fauci said, contact tracing is “not working.” As the primary technology used by public health authorities, it’s fraught with poor adoption, poor accuracy, high cost, and serious privacy concerns. To understand the issues with contact tracing, we analogize to a first-person video game to introduce the immunological concepts of viral dose, viral load, and undetectable asymptomatic carriers.

Imagine a video game scenario with attackers and shields to protect from attacks. Viral dose is analogous to incremental hits that weaken a player’s shield. Avoiding those hits prevents your shield from collapsing.

Viral load is analogous to how strongly any one attacker can hit a player’s shield. Certain infected individuals who are more progressed in their infection may hit you harder. Just as in the game, your shields may be destroyed by many weak hits from multiple attackers or one very strong hit from a single attacker.

Asymptomatic carriers are like players who, from a distance, look like they have no weapons. Undetectable asymptomatic carriers are like players whose weapons can’t even be detected when you search their belongings—the science indicates that infectious asymptomatic carriers may not be detectable with COVID-19 PCR swab tests. CDC blood test surveys show between 6 times to 25 times as many asymptomatic carriers are lurking out there for any single known case.

Clio GO uses crowd tracing and adaptive artificial intelligence (A2I) to progressively improve the estimates of each player’s shield strength, the intensity of hits you might encounter, and the likely hits from attackers whose weapons are completely undetectable.

In contrast, contact tracing requires that an attacker have a visible weapon, and if so, it assumes shields are obliterated immediately. However, if there is no visible weapon, the shields remain at 100%. In either case, contact tracing doesn’t decrease your shield level based on cumulative small hits (viral dose) or how intense the hits (viral load of others) are by taking into account the conditions at the time, such as mask usage, social distancing, and duration of contact.

The more people who have checked in to the same place, the lower each person’s shield is computed to be. Shield levels are further refined by reported mask usage and social distancing within the crowd. The venue never sees the person’s shield level. It sees a green or red check mark indicating if you’re entering with a valid pass and meets their entry requirements, but no symptom data is shared with the venue.

After your visit, you can rate the venue’s use of masks and social distancing, and this report helps compute your own shield level and that of others. When using the Clio GO app to scout venues prior to visiting, the venue’s listing shows the aggregate mask and social distancing ratings by visitors.

As part of our collaboration with AWS, and to broaden the adoption of crowd tracing, Clio GO app users can now pre-screen their visitors at no cost for personal, non-profit, faith- based, educational, and amateur athletic event use.

How you can contribute

We welcome everyone to participate in this collaborative effort. Using the app improves your and your visitor’s safety while contributing anonymized crowd tracing data to the open-source public AWS COVID-19 data lake.

In just a few minutes, you can get a free Clio GO account and use the Clio GO app to pre-screen people attending personal events and private clubs—whether small dinner parties, soccer matches, or religious gatherings. You can purchase additional hardware for unmanned door screening or mobile kiosk functionality, as well as solutions for commercial enterprises.

AWS COVID-19 data lake and crowd tracing data

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create a CloudFormation stack using the following template. This template creates a covid-19 database in your Glue Data Catalog and tables that point to the public AWS COVID-19 data lake. This includes a aspirevc_crowd_tracing table which points to up-to-date crowd tracing data, and also a aspirevc_crowd_tracing_zipcode_3digits table which points to a lookup which translates 3 digits zip codes used in the aspirevc_crowd_tracing table to the respective states.

You can query these tables using Amazon Athena. Athena is a serverless interactive query service that makes it easy to analyze the data in the AWS COVID-19 data lake. Athena supports SQL, a common language that data analysts use for analyzing structured data. To query the data, complete the following steps:

  1. Sign in to the Athena console.
    1. If this is the first time you are using Athena, you must specify a query result location on Amazon S3.
  2. From the drop-down menu, choose the covid-19 database.
  3. Enter your query.

The following query returns statistics including the number of people marked as symptoms, diagnosed, contact, and near for the given scan date:

SELECT
  cast(from_iso8601_timestamp(scandate) as date) as date,
  COUNT_IF(symptoms) as symptoms_count,
  COUNT_IF(diagnosed) as diagnosed_count,
  COUNT_IF(contact) as contact_count,
  COUNT_IF(near) as near_count, COUNT(*) as total_count
FROM "covid-19"."aspirevc_crowd_tracing"
WHERE from_iso8601_timestamp(scandate)
BETWEEN parse_datetime('2020-10-01:00:00:00','yyyy-MM-dd:HH:mm:ss')
AND parse_datetime('2020-10-16:00:00:00','yyyy-MM-dd:HH:mm:ss')
GROUP BY 1
ORDER BY 1

symptoms: Past 2 weeks, have you had any of the following symptoms: shortness of breath, fever, loss of taste or smell, new cough?

diagnosed: Past 2 weeks, have you been diagnosed with COVID or are waiting for COVID test results?

contact: Past 2 weeks, have you been in contact with anyone who has been diagnosed with COVID or is waiting for COVID test results?

near: Past 2 weeks, have you been near anyone with the following symptoms: shortness of breath, fever, loss of taste or smell, new cough?

The following screenshot shows the results of this query:

To see more details, you can run the following query to retrieve the same statistics per state per risklevel for the given scan date:

SELECT
  cast(from_iso8601_timestamp(scandate) as date) as date,
  SUBSTR(scannerdevice_zipcode, 1, 3) as zip,
  state,
  risklevel,
  result,
  COUNT_IF(symptoms) as symptoms_count,
  COUNT_IF(diagnosed) as diagnosed_count,
  COUNT_IF(contact) as contact_count,
  COUNT_IF(near) as near_count, 
  COUNT(*) as total_count
FROM "covid-19"."aspirevc_crowd_tracing"
JOIN "covid-19"."aspirevc_crowd_tracing_zipcode_3digits" ON SUBSTR(scannerdevice_zipcode, 1, 3) = aspirevc_crowd_tracing_zipcode_3digits.zip
WHERE from_iso8601_timestamp(scandate)
BETWEEN parse_datetime('2020-10-15:00:00:00','yyyy-MM-dd:HH:mm:ss')
AND parse_datetime('2020-10-16:00:00:00','yyyy-MM-dd:HH:mm:ss')
AND scannerdevice_zipcode<>''
GROUP BY 1,2,3,4,5
ORDER BY 1,3

The following screenshot shows the results of this query:

You can see that there are a small number of people marked as symptoms, diagnosed, contact, and near per state per risklevel.

By open-sourcing the data, we see possibilities to combine it with other AWS COVID-19 data lake datasets, such as hospitalizations or COVIDcast data. This can enable a new game feature such as a radar that predicts regional hotspot emergence.

If you’re a data analyst, we encourage you to contribute to building better crowd tracing algorithms using any of the data provided in the public AWS Covid-19 data lake. Even if you’re building a different solution, you can use this dataset without license fees. The following section can help you quickly get started.

The data

Although our public AWS COVID-19 data lake has excellent data sources, such as hospitalization data down to regional levels, Clio GO provides data even at the zip-code level. Below is a description of the schema of the data made available in the data lake:

Strong data privacy and cryptographic pseudonymity via Powch

In contrast to the significant privacy challenges associated with contact tracing, crowd tracing and the Clio GO system don’t require disclosure of contact identities to reduce your risk for COVID-19 infection. Returning to the video game analogy, learning the identity after the fact of who hit your shields doesn’t matter. However, knowing that that you’re entering a crowded map of strongly armed assailants might cause you to choose a different crowd. Therefore, the aggregate risk of the crowd becomes the only relevant concern for your risk of contracting COVID-19.

To protect your identity, the Clio GO app uses Powch, a powerful cryptographic technology that protects identity and data. Similar to Bitcoin, Powch enables pseudonymity—a way to log in without any linkage to your true identity. You don’t need to use any personally identifiable information for Clio Go registration. Instead, an unguessable and random secret ID is stored in a personal QR code, which only you have access to, and GO Pass has no knowledge about the owner of the secret ID. The secret ID is used during registration as the only form of identity in the Go Pass app.

After exposure to a possibly infected individual, contact tracing requires you to exactly identify all the individuals that you interacted with during the same period of time, and compromise their privacy as well as your own.

Although you may choose to be identified and share your name with the venue you visit, the Clio GO app never shows personal data, like actual temperature readings, to the venue owner. The app only tells the venue whether the GO Pass was accepted or denied based on the entry requirements of the venue.

Crowd-sourced solutions, open data, restrictions, and uses

We hope that free access to the crowd tracing data via the public AWS COVID-19 data lake encourages the development of new creative, low-cost COVID-19 mitigation solutions. You can use the data within commercial products under a creative commons license with the explicit requirement that algorithms developed from the public dataset are open and published.

We encourage using the crowd tracing data in the public AWS COVID-19 data lake in conjunction with other free data sources also in the lake. A commercial data feed with fewer restrictions and data limitations is being made available via AWS Data Exchange to commercial organizations who pass verification requirements.


About the authors and Aspire Ventures

Aspire Ventures has developed a novel artificial intelligence engine called A2i and joint ventures with mission-driven health systems. Aspire’s first joint venture with Penn Medical Lancaster General Health and Capital BlueCross established the Smart Health Innovation Lab, an entity that accelerates healthcare technologies that impact the quadruple aim. Aspire is partnering with Clalit, the majority health system of Israel, in a similar joint venture focused on Israeli start-ups to encourage innovation in healthcare.

Aspire Ventures was founded by Essam Abadir, SB MIT Mathematics, SB Sloan School of Management, and JD with Distinction from the University of Iowa School of Law. Essam founded Aspire Ventures as an impact investment AI firm in 2014 after selling an apps platform to Intel in 2013.

A2i is overseen by Victor Owuor, SB & MS Aeronautical and Astronautical Engineering MIT, SB & MS Electrical Engineering MIT, and JD Harvard School of Law. Prior to Aspire, Victor headed a significant cloud P&L at Oracle.

The Aspire Ventures CIO and Smart Health Innovation Lab CEO is Kim Ireland, MSIS Penn State University. Kim formerly managed health system EHR rollouts at Cerner and was CEO of startup MedStatix.

Scott Schell, PhD Immunology University of Chicago, MD University of Chicago, and MBA University of Michigan, heads Clio Health Go and is Chief Medical Officer for the Aspire portfolio. Scott was Founding Chair of Cleveland Clinic’s Department of Population Health and led development of two of healthcare’s largest platforms at Alere and UPMC.

Clio Health GO’s mission is to reinvent the healthcare experience via advanced telehealth, starting with COVID-19. GO Pass and the crowd tracing data and algorithm is a product of Medstatix. Powch provides patented cryptographic privacy and security technologies. Connexion Health provides the GO Pass kiosks. Clio Health GO, Medstatix, Connexion Health, and Powch are portfolio companies of Aspire.

Field Notes: Restricting Amazon WorkSpaces Users to Run Amazon Athena Queries

Post Syndicated from Somdeb Bhattacharjee original https://aws.amazon.com/blogs/architecture/field-notes-restricting-amazon-workspaces-users-to-run-amazon-athena-queries/

One of the use cases we hear from customers is that they want to provide very limited access to Amazon Workspaces users (for example contractors, consultants) in an AWS account. At the same time they want to allow them to query Amazon Simple Storage Service (Amazon S3) data in another account using Amazon Athena over a JDBC connection.

For example, marketing companies might provide private access to the first party data to media agencies through this mechanism.

The restrictions they want to put in place are:

  • For security reasons these Amazon WorkSpaces should not have internet connectivity. So the access to Amazon Athena must be over AWS PrivateLink.
  • Public access to Athena is not allowed using the credentials used for the JDBC connection. This is to prevent the users from leveraging the credentials to query the data from anywhere else.

In this post, we show how to use Amazon Virtual Private Cloud (Amazon VPC) endpoints for Athena, along with AWS Identity and Access Management (AWS IAM) policies. This provides private access to query the Amazon S3 data while preventing users from querying the data from outside their Amazon WorkSpaces or using the Athena public endpoint.

Let’s review the steps to achieve this:

  • Initial setup of two AWS accounts (AccountA and AccountB)
  • Set up Amazon S3 bucket with sample data in AccountA
  • Set up an IAM user with Amazon S3 and Athena access in AccountA
  • Create an Amazon VPC endpoint for Athena in AccountA
  • Set up Amazon WorkSpaces for a user in AccountB
  • Install a SQL client tool (we will use DbVisualizer Free) and Athena JDBC driver in Amazon WorkSpaces in AccountB
  • Use DbVisualizer to the query the Amazon S3 data in AccountA using the Athena public endpoint
  • Update IAM policy for user in AccountA to restrict private only access

 Prerequisites

To follow the steps in this post, you need two AWS Accounts. The Amazon VPC and subnet requirements are specified in the detailed steps.

Note: The AWS CloudFormation template used in this blog post is for US-EAST-1 (N. Virginia) Region so ensure the Region setting for both the accounts are set to US-EAST-1 (N. Virginia).

Walkthrough

The two AWS accounts are:

AccountA – Contains the Amazon S3 bucket where the data is stored. For AccountA you can create a new Amazon VPC or use the default Amazon VPC.

AccountB – Amazon WorkSpaces account. Use the following AWS CloudFormation template for AccountB:

  • The AWS CloudFormation template will create a new Amazon VPC in AccountB with CIDR 10.10.0.0/16 and set up one public subnet and two private subnets.
  • It will also create a NAT Gateway in the public subnet and create both public and private route tables.
  • Since we will be launching Amazon WorkSpaces in these private subnets and not all Availability Zones (AZ) are supported by Amazon WorkSpaces, it is important to choose the right AZ when creating them.

Review the documentation to learn which AWS Regions/AZ are supported.

We have provided two parameters in the AWS CloudFormation template:

  • AZName1
  • AZName2

Step 1

Before launching the CloudFormation stack:

  • Log in to AccountB
  • Search for AWS Resource Access Manager
  • On the right-hand side, you will notice the AZ ID to AZ Name mapping. Note down the AZ Name corresponding to AZ ID use1-az2 and use1-az4
  • Now launch the CloudFormation template and remember to choose the AZ names you noted down earlier
    • https://athena-workspaces-blogpost.s3.amazonaws.com/vpc.yaml
  • Enter the CloudFormation Stack Name as – ‘AthenaWorkspaces’ and leave everything default.
  • Once the CloudFormation stack creation is complete, create a peering connection from AccountB to AccountA.
  • Update the associated route tables for the private subnets with the new peering connection.

For information on how to create a VPC peering connection, refer to AWS documentation on VPC Peering.

AccountB VPC Route Table:

AccountB VPC Route Table:

AccountA VPC Route Table:

AccountA VPC Route Table

Step 2

  • Create a new Amazon S3 bucket in AccountA with a bucket name that starts with ‘athena-’.
  • Next, you can download a sample file and upload it to the Amazon S3 bucket you just created.
  • Use the following statements to create AWS Glue database. Use an external table for the data in the Amazon S3 bucket so that you can query it from Athena.
  • Go to Athena console and define a new database:

CREATE DATABASE IF NOT EXISTS sampledb

Once the database is created, create a new table in sampledb (by selecting sampledb from the “Database” drop down menu). Replace the <<your bucket name>> with the bucket you just created:

CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.amazon_reviews_tsv(
  marketplace string, 
  customer_id string, 
  review_id string, 
  product_id string, 
  product_parent string, 
  product_title string,
  product_category string,
  star_rating int, 
  helpful_votes int, 
  total_votes int, 
  vine string, 
  verified_purchase string, 
  review_headline string, 
  review_body string, 
  review_date string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  ESCAPED BY '\\'
  LINES TERMINATED BY '\n'
LOCATION
  's3://<<your bucket name>>/'
TBLPROPERTIES ("skip.header.line.count"="1")

 

Step 3

  • In AccountA, create a new IAM user with programmatic access.
  • Save the access key and secret access key.
  • For the same user add an Inline Policy which allows the following actions:

IAM summary

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowAthenaReadActions",
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups",
                "athena:ListDataCatalogs",
                "athena:GetExecutionEngine",
                "athena:GetExecutionEngines",
                "athena:GetNamespace",
                "athena:GetCatalogs",
                "athena:GetNamespaces",
                "athena:GetTables",
                "athena:GetTable"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowAthenaWorkgroupActions",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowGlueActionsViaVPCE",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowGlueActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowS3ActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::athena-*"
            ]
        }
    ]
}

 

Step 4

  • In this step, we create an Interface VPC endpoint (AWS PrivateLink) for Athena in AccountA. When you use an interface VPC endpoint, communication between your Amazon VPC and Athena is conducted entirely within the AWS network.
  • Each VPC endpoint is represented by one or more Elastic Network Interfaces (ENIs) with private IP addresses in your VPC subnets.
  • To create an Interface VPC endpoint follow the instructions and select Athena in the AWS Services list. Do not select the checkbox for Enable Private DNS Name.
  • Ensure the security group that is attached to the Amazon VPC endpoint is open to inbound traffic on port 443 and 444 for source AccountB VPC CIDR 10.10.0.0/16. Port 444 is used by Athena to stream query results.
  • Once you create the VPC endpoint, you will get a DNS endpoint name which is in the following format. We are going to use this in JDBC connection from the SQL client.

      VPC_Endpoint_ID.athena.Region.vpce.amazonaws.com

Step 5

  • In this step we set up Amazon WorkSpaces in AccountB.
  • Each Amazon WorkSpace is associated with the specific Amazon VPC and AWS Directory Service construct that you used to create it. All Directory Service constructs (Simple AD, AD Connector, and Microsoft AD) require two subnets to operate, each in different Availability Zones. This is why we created 2 private subnets at the beginning.
  • For this blog post I have used Simple AD as the directory service for the Amazon WorkSpaces.
  • By default, IAM users don’t have permissions for Amazon WorkSpaces resources and operations.
  • To allow IAM users to manage Amazon WorkSpaces resources, you must create an IAM policy that explicitly grants them permissions
  • Then attach the policy to the IAM users or groups that require those permissions.
  • To start, go to the Amazon WorkSpaces console and select Advanced Setup.
    • Set up a new directory using the SimpleAD option.
    • Use the “small” directory size and choose the Amazon VPC and private subnets you created in Step 1 for AccountB.
    • Once you create the directory, register the directory with Amazon WorkSpaces by selecting “Register” from the “Action” menu.
    • Select private subnets you created in Step 1 for AccountB.

Directory info

  • Next, launch Amazon WorkSpaces by following the Launch WorkSpaces button.
  • Select the directory you created and create a new user.
  • For the bundle, choose Standard with Windows 10 (PCoIP).
  • After the Amazon WorkSpaces is created, you can log in to the Amazon WorkSpaces using a client software. You can download it from https://clients.amazonworkspaces.com/
  • Login to your Amazon WorkSpace, install a SQL Client of your choice. At this point your Amazon WorkSpace still has Internet access via the NAT Gateway
  • I have used DbVisualizer (the free version) as the SQL client. Once you have that installed, install the JDBC driver for Athena following the instructions
  • Now you can set up the JDBC connections to Athena using the access key and secret key you set up for an IAM user in AccountA.

Step 6

To test out both the Athena public endpoint and the Athena VPC endpoint, create two connections using the same credentials.

For the Athena public endpoint, you need to use athena.us-east-1.amazonaws.com service endpoint. (jdbc:awsathena://athena.us-east-1.amazonaws.com:443;S3OutputLocation=s3://<athena-bucket-name>/)

Athena public

For the VPC Endpoint Connection, use the VPC Endpoint you created in Step 4 (jdbc:awsathena://vpce-<>.athena.us-east-1.vpce.amazonaws.com:443;S3OutputLocation=s3://<athena-bucket-name>/)

Database connection Athena

Now run a simple query to select records from the amazon_reviews_tsv table using both the connections.

SELECT * FROM sampledb.amazon_reviews_tsv limit 10

You should be able to see results using both the connections. Since the private subnets are still connected to the internet via the NAT Gateway, you can query using the Athena public endpoint.

Run the AWS Command Line Interface (AWS CLI) command using the credentials used for the JDBC connection from your workstation. You should be able to access the Amazon S3 bucket objects and the Athena query run list using the following commands.

aws s3 ls s3://athena-workspaces-blogpost

aws athena list-query-executions

Step 7

  • Now we lock down the access as described in the beginning of this blog post by taking the following actions:
  • Update the route table for the private subnets by removing the route for the internet so access to the Athena public endpoint is restricted from the Amazon WorkSpaces. The only access will be allowed through the Athena VPC Endpoint.
  • Add conditional checks to the IAM user access policy that will restrict access to the Amazon S3 buckets and Athena only if:
    • The request came in through the VPC endpoint. For this we use the “aws:SourceVpce” check and provide the VPC Endpoint ID value.
    • The request for Amazon S3 data is through Athena. For this we use the condition “aws:CalledVia” and provide a value of “athena.amazonaws.com”.
  • In the IAM access policy below replace <<your vpce id>> with your VPC endpoint id and update the previous inline policy which was added to the IAM user in Step 3.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowAthenaReadActions",
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups",
                "athena:ListDataCatalogs",
                "athena:GetExecutionEngine",
                "athena:GetExecutionEngines",
                "athena:GetNamespace",
                "athena:GetCatalogs",
                "athena:GetNamespaces",
                "athena:GetTables",
                "athena:GetTable"
            ],
            "Resource": "*",
            "Condition":{
               "StringEquals":{
                  "aws:SourceVpce":[
                     "<<your vpce id>>"
                  ]
               }
            }
        },
        {
            "Sid": "AllowAthenaWorkgroupActions",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup"
            ],
            "Resource": "*",
            "Condition":{
               "StringEquals":{
                  "aws:SourceVpce":[
                     "<<your vpce id>>"
                  ]
               }
            }
        },
        {
            "Sid": "AllowGlueActionsViaVPCE",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*",
            "Condition":{
               "StringEquals":{
                  "aws:SourceVpce":[
                     "<<your vpce id>>"
                  ]
               }
            }
        },
        {
            "Sid": "AllowGlueActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*",
            "Condition":{
               "ForAnyValue:StringEquals":{
                  "aws:CalledVia":[
                     "athena.amazonaws.com"
                  ]
               }
            }
        },
        {
            "Sid": "AllowS3ActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::athena-*"
            ],
            "Condition":{
               "ForAnyValue:StringEquals":{
                  "aws:CalledVia":[
                     "athena.amazonaws.com"
                  ]
               }
            }
        }
    ]
}

Once you applied the changes, try to reconnect using both the Athena VPC endpoint as well Athena public endpoint connections. The Athena VPC endpoint connection should work but the public endpoint connection will time out. Also try the same Amazon S3 and Athena AWS CLI commands. You should get access denied for both the operations.

Clean Up

To avoid incurring costs, remember to delete the resources that you created.

For AWS AccountA:

  • Delete the S3 buckets
  • Delete the database you created in AWS Glue
  • Delete the Amazon VPC endpoint you created for Amazon Athena

For AccountB:

  • Delete the Amazon Workspace you created along with the Simple AD directory. You can review more information on how to delete your Workspaces.

Conclusion

In this blog post, I showed how to leverage Amazon VPC endpoints and IAM policies to privately connect to Amazon Athena from Amazon Workspaces that don’t have internet connectivity.

Give this solution a try and share your feedback in the comments!

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

Redacting sensitive information with user-defined functions in Amazon Athena

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/redacting-sensitive-information-with-user-defined-functions-in-amazon-athena/

Amazon Athena now supports user-defined functions (in Preview), a feature that enables you to write custom scalar functions and invoke them in SQL queries. Although Athena provides built-in functions, UDFs enable you to perform custom processing such as compressing and decompressing data, redacting sensitive data, or applying customized decryption. You can write your UDFs in Java using the Athena Query Federation SDK. When a UDF is used in a SQL query submitted to Athena, it’s invoked and run on AWS Lambda. You can use UDFs in both SELECT and FILTER clauses of a SQL query, and invoke multiple UDFs in the same query. Athena UDF functionality is available in Preview mode in the US East (N. Virginia) Region.

This blog post covers basic functionalities of Athena UDF, writing your own UDF, building and publishing your UDF to AWS Serverless Application Repository, configuring the UDF connector application, and using the UDF in Athena queries. You can implement use cases involving sensitive data, such as personal identifiable information (PII), credit cards, or Social Security numbers (SSN). In this post, we deploy a Redact UDF and use that to mask sensitive information.

Prerequisites

Before creating your development environment, you must have the following prerequisites:

To set up the development environment and address these prerequisites, deploy the CloudFormation template in the first part of this series, Extracting and joining data from multiple data sources with Athena Federated Query. The post provides instructions on building the required test environment and resources using the CloudFormation template.

Creating your IDE

After you deploy the CloudFormation template, you need to create the required AWS resources. To create the development environment to build and deploy the UDF, we use an AWS Cloud9 IDE. On the AWS Cloud9 console, locate your environment and choose Open IDE.

AWS Cloud9 Resize

The AWS Cloud9 IDE comes with a default 10 GB disk space, which can fill quickly when setting up the development environment, so you should resize it.

  1. Run the following command in the AWS Cloud9 IDE terminal to get the resize script:
    curl https://aws-data-analytics-workshops.s3.amazonaws.com/athena-  
         workshop/scripts/cloud9_resize.sh > cloud9_resize.sh

  1. Run the script by issuing the following command on the terminal to resize the disk to 20 GB:
    sh cloud9_resize.sh 20

  1. Check the free space on the disk with the following code:
    df -h

You should see something like the following screenshot.

Setting up the IDE

Next, you clone the SDK and prepare your IDE.

  1. Make sure that Git is installed on your system by entering the following code:
    sudo yum install git -y

  1. To install the Athena Query Federation SDK, enter the following command at the command line to clone the SDK repository. This repository includes the SDK, examples, and a suite of data source connectors.
    git clone https://github.com/awslabs/aws-athena-query-federation.git

If you’re working on a development machine that already has Apache Maven, the AWS CLI, and the AWS Serverless Application Model build tool installed, you can skip this step.

  1. From the root of the aws-athena-query-federation directory that you created when you cloned the repository, run the prepare_dev_env.sh script that prepares your development environment:
    cd aws-athena-query-federation	
    
    sudo chown ec2-user:ec2-user ~/.profile
    
    ./tools/prepare_dev_env.sh

This script requires manual inputs to run (choosing Enter as needed during the setup steps when prompted). You can edit this script to remove the manual inputs if you want to automate the setup entirely.

  1. Update your shell to source new variables created by the installation process or restart your terminal session:
    source ~/.profile

  1. Run the following code from the athena-federation-sdk directory within the GitHub project you checked out earlier:

Adding the UDF code and publishing the connector

In this section, you add your UDF function, build the JAR file, and deploy the connector.

  1. In the AWS Cloud9 IDE, expand the aws-athena-query-federation project and navigate to the AthenaUDFHandler.java file.
  2. Choose the file (double-click) to open it for editing.

Now we add the UDF code for a String Redact function, which redacts a string to show only the last four characters. You can use this UDF function to mask sensitive information.

  1. Enter the following code:
    /** Redact a string to show only the last 4 characters
         * 
         * 
         * 
         * @param input the string to redact
         * @return redacted string
         */
        public String redact(String input)
        {
            String redactedString = new StringBuilder(input).replace(0,     
                input.length()- 4, new String(new char[input.length() -   
                4]).replace("\0", "x")).toString(); 
            return redactedString;
        }

You can also copy the modified code with the following command (which must be run from the aws-athena-query-federation directory):

curl https://aws-data-analytics-workshops.s3.amazonaws.com/athena-workshop/scripts/AthenaUDFHandler.java > athena-udfs/src/main/java/com/amazonaws/athena/connectors/udfs/AthenaUDFHandler.java

After copying the file, you can open it in the AWS Cloud9 IDE to see its contents.

  1. To build the JAR file, save the file and run mvn clean install to build your project:
    cd ~/environment/aws-athena-query-federation/athena-udfs/
    
    mvn clean install

After it successfully builds, a JAR file is created in the target folder of your project named artifactId-version.jar, where artifactId is the name you provided in the Maven project, for example, athena-udfs.

  1. From the athena-udfs directory, run the following code to publish the connector to your private AWS Serverless Application Repository. The S3_BUCKET_NAME in the command is the Amazon Simple Storage Service (Amazon S3) location where a copy of the connector’s code is stored for the AWS Serverless Application Repository to retrieve it.
../tools/publish.sh S3_BUCKET_NAME athena-udfs

This allows users with relevant permission levels to deploy instances of the connector via a one-click form.

When the connector is published successfully, it looks like the following screenshot.

To see AthenaUserDefinedFunctions, choose the link shown in the terminal after the publish is successful or navigate to the AWS Serverless Application Repository by choosing Available Applications, Private applications.

Setting up the UDF connector

Now that the UDF connector code is published, we can install the UDF connector to use with Athena.

  1. Choose the AthenaUserDefinedFunctions application listed on the Private applications section in the AWS Serverless Application Repository.
  2. For Application name, leave it as the default name AthenaUserDefinedFunctions.
  3. For SecretNameorPrefix, enter a secret name if you have already saved it in AWS Secrets Manager; otherwise, enter database-*.
  4. For LambdaFunctionName, enter customudf.
  5. Leave the remaining fields as default.
  6. Select I acknowledge that this app creates custom IAM roles.
  7. Choose Deploy.

Querying with UDF in Athena

Now that the UDF connector code is deployed, we can run Athena queries that use the UDF.

If you ran the CloudFormation template from Part 1 of this blog series, the AmazonAthenaPreviewFunctionality workgroup was already created. If not, choose Create Workgroup on the Athena console and create a workgroup named AmazonAthenaPreviewFunctionality and set up your query result location in Amazon S3.

To proceed, make sure you are in the workgroup AmazonAthenaPreviewFunctionality. If not, choose the workgroup AmazonAthenaPreviewFunctionality and choose Switch workgroup.

You can now run a query to use the Redact UDF to mask sensitive information from PII columns. To show the comparison, we have included the PII column and masked data as part of the query results. If you ran the CloudFormation template from Part 1 of this series, you can navigate to the Saved Queries on the Athena console and choose RedactUdfCustomerAddress.

The following screenshot shows your query.

After the query runs, you should see results like the following screenshot. The redact_name, redact_phone, and redact_address columns only show the last four characters.

Cleaning up

To clean up the resources created as part of your CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty and delete the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
     aws s3 rm s3://athena-federation-workshop-<account-id> --recursive

  1. Use the AWS CloudFormation console or AWS CLI to delete the stacks Athena-Federation-Workshop and serverlessrepo-AthenaUserDefinedFunctions

Summary

In this post, you learned about Athena user-defined functions, how to create your own UDF, and how to deploy it to a private AWS Serverless Application Repository. You also learned how to configure the UDF and use it in your Athena queries. In the next post of this series, we discuss and demonstrate how to use a machine learning (ML) anomaly detection model developed on Amazon SageMaker and use that model in Athena queries to invoke an ML inference function to detect anomaly values in our orders dataset.


About the Authors


Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 


Amir Basirat is a Big Data specialist solutions architect at Amazon Web Services,
focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a Big Data specialist for different technology companies. He also has a PhD in computer science, where his research was primarily focused on large-scale distributed computing and neural networks.

New – Export Amazon DynamoDB Table Data to Your Data Lake in Amazon S3, No Code Writing Required

Post Syndicated from Alex Casalboni original https://aws.amazon.com/blogs/aws/new-export-amazon-dynamodb-table-data-to-data-lake-amazon-s3/

Hundreds of thousands of AWS customers have chosen Amazon DynamoDB for mission-critical workloads since its launch in 2012. DynamoDB is a nonrelational managed database that allows you to store a virtually infinite amount of data and retrieve it with single-digit-millisecond performance at any scale. To get the most value out of this data, customers had […]

Handling data erasure requests in your data lake with Amazon S3 Find and Forget

Post Syndicated from Chris Deigan original https://aws.amazon.com/blogs/big-data/handling-data-erasure-requests-in-your-data-lake-with-amazon-s3-find-and-forget/

Data lakes are a popular choice for organizations to store data around their business activities. Best practice design of data lakes impose that data is immutable once stored, but new regulations such as the European General Data Protection Regulation (GDPR), California Consumer Privacy Act (CCPA), and others have created new obligations that operators now need to be able to erase private data from their data lake when requested.

When asked to erase an individual’s private data, as a data lake operator you have to find all the objects in your Amazon Simple Storage Service (Amazon S3) buckets that contain data relating to that individual. This can be complex because data lakes contain many S3 objects (each of which may contain multiple rows), as shown in the following diagram. You often can’t predict which objects contain data relating to an individual, so you need to check each object. For example, if the user mary34 asks to be removed, you need to check each object to determine if it contains data relating to mary34. This is the first challenge operators face: identifying which objects contain data of interest.

After you identify objects containing data of interest, you face a second challenge: you need to retrieve the object from the S3 bucket, remove relevant rows from the file, put a new version of the object into S3, and make sure you delete any older versions.

Locating and removing data manually can be time-consuming and prone to mistakes, considering the large number of objects typically in data lakes.

Amazon S3 Find and Forget solves these challenges with ready-to-use automations. It allows you to remove records from data lakes of any size that are in AWS Glue Data Catalog. The solution includes a web user interface that you can use and an API that you can use to integrate with your own applications.

Solution overview

Amazon S3 Find and Forget enables you to find and delete records automatically in data lakes on Amazon S3. Using the solution, you can:

  • Define which tables from your AWS Glue Data Catalog contain data you want to erase
  • Manage a queue of identifiers (such as unique customer identifiers) to erase
  • Erase rows from your data lake matching the queued record identifiers
  • Access a log of all actions taken by the solution

You can use Amazon S3 Find and Forget to work with data lakes stored on Amazon S3 in a supported file format.

The solution is developed and distributed as open-source software that you deploy and run inside your own AWS account. When deploying this solution, you only pay for the AWS services consumed to run it. We recommend reviewing the Cost Estimate guide and creating Amazon CloudWatch Billing Alarms to monitor charges before deploying the solution in your own account.

When you handle requests to remove data, you add the identifiers through the web interface or API to a Deletion Queue. The identifiers remain in the queue until you start a Deletion Job. The Deletion Job processes the queue and removes matching rows from objects in your data lake.

Where your requirements allow it, batching deletions can provide significant cost savings by minimizing the number of times the data lake needs to be re-scanned and processed. For example, you could start a Deletion Job once a week to process all requests received in the preceding week.

Solution demonstration

This section provides a demonstration of using Amazon S3 Find and Forget’s main features. To deploy the solution in your own account, refer to the User Guide.

For this demonstration, I have prepared in advance:

The first step is to deploy the solution using AWS CloudFormation by following the instructions in the User Guide. The CloudFormation stack can take 20-30 minutes to deploy depending on the options chosen when deploying.

Once deployed, I visit the web user interface by going to the address in the WebUIUrl CloudFormation stack output. Using a temporary password emailed to the address I provided in my CloudFormation parameters, I login and set a password for future use. I then see a dashboard with some base metrics for my Amazon S3 Find and Forget deployment:

I now need to create a Data Mapper so that Amazon S3 Find and Forget can find my data lake. To do this, I select Data Mappers, then Create Data Mapper:

On this screen, I give my Data Mapper a name, choose the AWS Glue database and table in my account that I want to operate on, and the columns that I want my deletions to match. In this demonstration, I’m using a copy of the Amazon Customer Reviews Dataset that I copied to my own S3 bucket. I’ll be using the customer_id column to remove data. In the dataset, this field contains a unique identifier for each customer who has created a product review.

I then specify the IAM role to be used when modifying the objects in S3. I also choose whether I want the old S3 object versions to be deleted for me. I can turn this off if I want to implement my own strategy to manage deleting old object versions, such as by using S3 lifecycle policies.

After choosing Create Data Mapper the Data Mapper is created, and I am prompted to grant permissions for S3 Find and Forget to operate in my bucket. In the Data Mapper list, I select my new Data Mapper, then choose Generate Access Policies. The interface displays a sample bucket policy that I copy and paste into the bucket policy for my S3 bucket in the AWS Management Console.

With the Data Mapper set up, I’m now able to add the customers who have requested to have their data deleted to the Deletion Queue. Using their Customer IDs, I go to the Deletion Queue section and select Add Match to the Deletion Queue.

I’ve chosen to delete from all the available Data Mappers, but I can also choose specific ones. Once I’ve added my matches, I can see a list of them on Deletion Queue page:

I can now run a deletion job that will cause the matches to be deleted from the data lake. To do this, I select Deletion Jobs then Start a Deletion Job.

After a few minutes the Deletion Job completes, and I can see metrics collected during the job including that the job took just over two-and-a-half minutes:

There is an Export to JSON option that includes all the metrics shown, more granular information about the Deletion Job, and which S3 objects were modified.

At this point the Deletion Queue is empty, and ready for me to use for future requests.

Solution design

This section includes a brief introduction to how the solution works. More comprehensive design documentation is available in the Amazon S3 Find and Forget GitHub repository.

The following diagram illustrates the architecture of this solution.

Amazon S3 Find and Forget uses AWS Serverless services to optimize for cost and scalability. The user interface and API are built using Amazon S3, Amazon Cognito, AWS Lambda, Amazon DynamoDB, and Amazon API Gateway, which automatically scale down when not in use so that there is no expensive baseline cost just for having the solution installed. These AWS services are always available and scale in concert with when the solution is used with a pay-for-what-you-use price model.

The Deletion Job workflow is coordinated using AWS Step Functions, Lambda, and Amazon Simple Queue Service (Amazon SQS). The solution uses Step Functions for high-level coordination and state tracking in the workflow, Lambda functions for discrete computation tasks, and Amazon SQS to store queues of repetitive work.

A deletion job has two phases: Find and Forget. In the Find phase, the solution uses Amazon Athena to scan the data lake for objects containing rows matching the identifiers in the deletion queue. For this to work at scale, we built a query planner Lambda function that uses the partition list in the AWS Glue Data Catalog for each data mapper to run an Athena query on each partition, returning the path to S3 objects that contain matches with the identifiers in the Deletion Queue. The object keys are then added to an SQS queue that we refer to as the Object Deletion Queue.

In the Forget phase, deletion workers are started as a service running on AWS Fargate. These workers process each object in the Object Deletion Queue by downloading the objects from the S3 bucket into memory, deleting the rows that contain matched identifiers, then putting a new version of the object to the S3 bucket using the same key. By default, older versions of the object are then deleted from the S3 bucket to make the deletion irreversible. You can alternatively disable this feature to implement your own strategy for deleting old object versions, such as by using an S3 Lifecycle policy.

Note that during the Forget phase, affected S3 objects are replaced at the time they are processed and are subject to the Amazon S3 data consistency model. We recommend that you avoid running a Deletion Job in parallel to a workload that reads from the data lake unless it has been designed to handle temporary inconsistencies between objects.

When the object deletion queue is empty, the Forget phase is complete and a final status is determined for the Deletion Job based on whether any errors occurred (for example, due to missing permissions for S3 objects).

Logs are generated for all actions throughout the Deletion Job, which you can use for reporting or troubleshooting. These are stored in DynamoDB, along with other persistent data including the Data Mappers and Deletion Queue.

Conclusion

In this post, we introduced the Amazon S3 Find and Forget solution, which assists data lake operators to handle data erasure requests they may receive pursuant to regulations such as GDPR, CCPA, and others. We then described features of the solution and how to use it for a basic use case.

You can get started today by deploying the solution from the GitHub repository, where you can also find more documentation of how the solution works, its features, and limits. We are continuing to develop the solution and welcome you to send feedback, feature requests, or questions through GitHub Issues.

 


About the Authors

Chris Deigan is an AWS Solution Engineer in London, UK. Chris works with AWS Solution Architects to create standardized tools, code samples, demonstrations, and quick starts.

 

 

 

Matteo Figus is an AWS Solution Engineer based in the UK. Matteo works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. He is passionate about open-source software and in his spare time he likes to cook and play the piano.

 

 

 

Nick Lee is an AWS Solution Engineer based in the UK. Nick works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. In his spare time he enjoys playing football and squash, and binge-watching TV shows.

 

 

 

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

 

Cristina Fuia is a Specialist Solutions Architect for Analytics at AWS. She works with customers across EMEA helping them to solve complex problems, design and build data architectures so that they can get business value from analyzing their data.

 

Extracting and joining data from multiple data sources with Athena Federated Query

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/extracting-and-joining-data-from-multiple-data-sources-with-athena-federated-query/

With modern day architectures, it’s common to have data sitting in various data sources. We need proper tools and technologies across those sources to create meaningful insights from stored data. Amazon Athena is primarily used as an interactive query service that makes it easy to analyze unstructured, semi-structured, and structured data stored in Amazon Simple Storage Service (Amazon S3) using standard SQL. With the federated query functionality in Athena, you can now run SQL queries across data stored in relational, non-relational, object, and custom data sources and store the results back in Amazon S3 for further analysis.

The goals for this series of posts are to discuss how we can configure different connectors to run federated queries with complex joins across different data sources, how to configure a user-defined function for redacting sensitive information when running Athena queries, and how we can use machine learning (ML) inference to detect anomaly detection in datasets to help developers, big data architects, data engineers, and business analysts in their daily operational routines.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of Athena’s query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB, Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon RedShift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source on the cloud or on premises that is accessible from Lambda.

The first post of this series discusses how to configure Athena Federated Query connectors and use them to run federated queries for data residing in HBase on Amazon EMR, Amazon Aurora MySQL, DynamoDB, and ElastiCache for Redis databases.

Test data

To demonstrate Athena federation capabilities, we use the TPCH sample dataset. TPCH is a decision support benchmark and has broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, run queries with a high degree of complexity, and give answers to critical business questions. For our use case, imagine a hypothetical ecommerce company with the following architecture:

  • Lineitems processing records stored in HBase on Amazon EMR to meet requirements for a write-optimized data store with high transaction rate and long-term durability
  • ElastiCache for Redis stores Nations and ActiveOrders tables so that the processing engine can get fast access to them
  • An Aurora with MySQL engine is used for Orders, Customer, and Suppliers accounts data like email addresses and shipping addresses
  • DynamoDB hosts Part and Partsupp data, because DynamoDB offers high flexibility and high performance

The following diagram shows a schematic view of the TPCH tables and their associated data stores.

Building a test environment using AWS CloudFormation

Before following along with this post, you need to create the required AWS resources in your account. To do this, we have provided you with an AWS CloudFormation template to create a stack that contains the required resources: the sample TPCH database on Amazon Relational Database Service (Amazon RDS), HBase on Amazon EMR, Amazon ElastiCache for Redis, and DynamoDB.

The template also creates the AWS Glue database and tables, S3 bucket, Amazon S3 VPC endpoint, AWS Glue VPC endpoint, Athena named queries, AWS Cloud9 IDE, an Amazon SageMaker notebook instance, and other AWS Identity and Access Management (IAM) resources that we use to implement the federated query, user-defined functions (UDFs), and ML inference functions.

This template is designed only to show how you can use Athena Federated Query, UDFs, and ML inference. This setup isn’t intended for production use without modification. Additionally, the template is created for use in the us-east-1 Region, and doesn’t work in other Regions.

Before launching the stack, you must have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI), and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation

To create your resources, complete the following steps:

  1. Choose Launch Stack:
  2. Select I acknowledge that this template may create IAM resources.

This template creates resources that incur costs while they remain in use. Follow the cleanup steps at the end of this post to delete and clean up the resources to avoid any unnecessary charges.

  1. When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console.

The CloudFormation stack takes approximately 20–30 minutes to complete. Check the AWS CloudFormation console and wait for the status CREATE_COMPLETE.

When stack creation is complete, your AWS account has all the required resources to implement this solution.

  1. On the Outputs tab of the Athena-Federation-Workshop stack, capture the following:
    1. S3Bucket
    2. Subnets
    3. WorkshopSecurityGroup
    4. EMRSecurityGroup
    5. HbaseConnectionString
    6. RDSConnectionString

You need all this information when setting up connectors.

  1. When the stacks are complete, check the status of the Amazon EMR steps on the Amazon EMR console.

It can take up to 15 minutes for this step to complete.

Deploying connectors and connecting to data sources

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source. In the first part, you give the Lambda function a name that you can later choose on the Athena console. In the second part, you give the connector a name that you can reference in your SQL queries.

We want to query different data sources, so in the following sections we set up Lambda connectors for HBase on Amazon EMR, Aurora MySQL, DynamoDB, and Redis before we start creating complex joins across data sources using Athena federated queries. The following diagram shows the architecture of our environment.

Installing the Athena JDBC connector for Aurora MySQL

The Athena JDBC connector supports the following databases:

  • MySQL
  • PostGreSQL
  • Amazon Redshift

To install the Athena JDBC connector for Aurora MySQL, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaJdbcConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name, AthenaJdbcConnector.
    2. SecretNamePrefix – Enter AthenaJdbcFederation.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. DefaultConnectionString – Enter the RDSConnectionString value from the AWS CloudFormation outputs.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaFunctionName – Enter mysql.
    7. LambdaMemory – Leave it as the default value 3008.
    8. LambdaTimeout – Leave it as the default value 900.
    9. SecurityGroupIds – Enter the WorkshopSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Change the default value to athena-spill/jdbc.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena JDBC connector for Aurora MySQL; you can refer to this Lambda function in your queries as lambda:mysql.

For more information about the Athena JDBC connector, see the GitHub repo.

Installing the Athena DynamoDB connector

To install Athena DynamoDB Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaDynamoDBConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaDynamoDBConnector.
    2. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    3. AthenaCatalogName – Enter dynamo.
    4. DisableSpillEncryption – Leave it as the default value false.
    5. LambdaMemory – Leave it as the default value 3008.
    6. LambdaTimeout – Leave it as the default value 900.
    7. SpillPrefix – Enter athena-spill-dynamo.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys Athena DynamoDB connector; you can refer to this Lambda function in your queries as lambda:dynamo.

For more information about the Athena DynamoDB connector, see the GitHub repo.

Installing the Athena HBase connector

To install the Athena HBase connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaHBaseConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaHBaseConnector
    2. SecretNamePrefix – Enter hbase-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter hbase.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. DefaultConnectionString – Enter the HbaseConnectionString value from the AWS CloudFormation outputs.
    7. LambdaMemory – Leave it as the default value of 3008.
    8. LambdaTimeout – Leave it as the default value of 900.
    9. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Enter athena-spill-hbase.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena HBase connector; you can refer to this Lambda function in your queries as lambda:hbase.

For more information about the Athena HBase connector, see the GitHub repo.

Installing the Athena Redis connector

To install Athena Redis Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaRedisConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaRedisConnector.
    2. SecretNameOrPrefix – Enter redis-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter redis.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaMemory – Leave it as the default value 3008.
    7. LambdaTimeout – Leave it as the default value 900.
    8. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    9. SpillPrefix – Enter athena-spill-redis.
    10. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena Redis connector; you can refer to this Lambda function in your queries as lambda:redis.

For more information about the Athena Redis connector, see the GitHub repo.

Redis database and tables with the AWS Glue Data Catalog

Because Redis doesn’t have a schema of its own, the Redis connector can’t infer the columns or data type from Redis. The Redis connector needs an AWS Glue database and tables to be set up so it can associate the data to the schema. The CloudFormation template creates the necessary Redis database and tables in the Data Catalog. You can confirm this on the AWS Glue console.

Running federated queries

Now that the connectors are deployed, we can run Athena queries that use those connectors.

  1. On the Athena console, choose Get Started.
  2. Make sure you’re in the workgroup AmazonAthenaPreviewFunctionality. If not, choose Workgroups, select AmazonAthenaPreviewFunctionality, and choose Switch Workgroup.

On the Saved Queries tab, you can see a list of pre-populated queries to test.

The Sources saved query tests your Athena connector functionality for each data source, and you can make sure that you can extract data from each data source before running more complex queries involving different data sources.

  1. Highlight the first query up to the semicolon and choose Run query.

After successfully testing connections to each data source, you can proceed with running more complex queries, such as:

  • FetchActiveOrderInfo
  • ProfitBySupplierNationByYr
  • OrdersRevenueDateAndShipPrio
  • ShippedLineitemsPricingReport
  • SuppliersWhoKeptOrdersWaiting

If you see an error on the HBase query like the following, try rerunning it and it should resolve the issue.

GENERIC_USER_ERROR: Encountered an exception[java.lang.RuntimeException] from your LambdaFunction[hbase] executed in context[retrieving meta-data] with message[org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location for replica 0]

As an example of the advanced queries, the SuppliersWhoKeptOrdersWaiting query identifies suppliers whose product was part of a multi-supplier order (with current status of F) and they didn’t ship the required parts on time. This query uses multiple data sources: Aurora MySQL and HBase on Amazon EMR. As shown in the following screenshot, the query extracts data from the supplier table on Aurora MySQL, the lineitem table on HBase, and the orders tables on Aurora MySQL. The results are returned in 7.13 seconds.

Cleaning up

To clean up the resources created as part of our CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
    aws s3 rm s3://athena-federation-workshop-<account-id> --recursive

  3. On the AWS CloudFormation console, delete all the connectors so they’re no longer attached to the elastic network interface (ENI) of the VPC. Alternatively, go to each connector and deselect the VPC so it’s no longer attached to the VPC created by AWS CloudFormation.
  4. On the Amazon SageMaker console, delete any endpoints you created as part of the ML inference.
  5. On the Athena console, delete the AmazonAthenaPreviewFunctionality workgroup.
  6. On the AWS CloudFormation console or the AWS CLI, delete the stack Athena-Federation-Workshop.

Summary

In this post, we demonstrated the functionality of Athena federated queries by creating multiple different connectors and running federated queries against multiple data sources. In the next post, we show you how you can use the Athena Federation SDK to deploy your UDF and invoke it to redact sensitive information in your Athena queries.


About the Authors

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 

 

Amir Basirat is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a big data specialist for different technology companies. He also has a PhD in computer science, where his research primarily focused on large-scale distributed computing and neural networks.

 

 

 

 

Mercado Libre: How to Block Malicious Traffic in a Dynamic Environment

Post Syndicated from Gaston Ansaldo original https://aws.amazon.com/blogs/architecture/mercado-libre-how-to-block-malicious-traffic-in-a-dynamic-environment/

Blog post contributors: Pablo Garbossa and Federico Alliani of Mercado Libre

Introduction

Mercado Libre (MELI) is the leading e-commerce and FinTech company in Latin America. We have a presence in 18 countries across Latin America, and our mission is to democratize commerce and payments to impact the development of the region.

We manage an ecosystem of more than 8,000 custom-built applications that process an average of 2.2 million requests per second. To support the demand, we run between 50,000 to 80,000 Amazon Elastic Cloud Compute (EC2) instances, and our infrastructure scales in and out according to the time of the day, thanks to the elasticity of the AWS cloud and its auto scaling features.

Mercado Libre

As a company, we expect our developers to devote their time and energy building the apps and features that our customers demand, without having to worry about the underlying infrastructure that the apps are built upon. To achieve this separation of concerns, we built Fury, our platform as a service (PaaS) that provides an abstraction layer between our developers and the infrastructure. Each time a developer deploys a brand new application or a new version of an existing one, Fury takes care of creating all the required components such as Amazon Virtual Private Cloud (VPC), Amazon Elastic Load Balancing (ELB), Amazon EC2 Auto Scaling group (ASG), and EC2) instances. Fury also manages a per-application Git repository, CI/CD pipeline with different deployment strategies, such like blue-green and rolling upgrades, and transparent application logs and metrics collection.

Fury- MELI PaaS

For those of us on the Cloud Security team, Fury represents an opportunity to enforce critical security controls across our stack in a way that’s transparent to our developers. For instance, we can dictate what Amazon Machine Images (AMIs) are vetted for use in production (such as those that align with the Center for Internet Security benchmarks). If needed, we can apply security patches across all of our fleet from a centralized location in a very scalable fashion.

But there are also other attack vectors that every organization that has a presence on the public internet is exposed to. The AWS recent Threat Landscape Report shows a 23% YoY increase in the total number of Denial of Service (DoS) events. It’s evident that organizations need to be prepared to quickly react under these circumstances.

The variety and the number of attacks are increasing, testing the resilience of all types of organizations. This is why we started working on a solution that allows us to contain application DoS attacks, and complements our perimeter security strategy, which is based on services such as AWS Shield and AWS Web Application Firewall (WAF). In this article, we will walk you through the solution we built to automatically detect and block these events.

The strategy we implemented for our solution, Network Behavior Anomaly Detection (NBAD), consists of four stages that we repeatedly execute:

  1. Analyze the execution context of our applications, like CPU and memory usage
  2. Learn their behavior
  3. Detect anomalies, gather relevant information and process it
  4. Respond automatically

Step 1: Establish a baseline for each application

End user traffic enters through different AWS CloudFront distributions that route to multiple Elastic Load Balancers (ELBs). Behind the ELBs, we operate a fleet of NGINX servers from where we connect back to the myriad of applications that our developers create via Fury.

MELI Architecture - nomaly detection project-step 1

Step 1: MELI Architecture – Anomaly detection project

We collect logs and metrics for each application that we ship to Amazon Simple Storage Service (S3) and Datadog. We then partition these logs using AWS Glue to make them available for consumption via Amazon Athena. On average, we send 3 terabytes (TB) of log files in parquet format to S3.

Based on this information, we developed processes that we complement with commercial solutions, such as Datadog’s Anomaly Detection, which allows us to learn the normal behavior or baseline of our applications and project expected adaptive growth thresholds for each one of them.

Anomaly detection

Step 2: Anomaly detection

When any of our apps receives a number of requests that fall outside the limits set by our anomaly detection algorithms, an Amazon Simple Notification Service (SNS) event is emitted, which triggers a workflow in the Anomaly Analyzer, a custom-built component of this solution.

Upon receiving such an event, the Anomaly Analyzer starts composing the so-called event context. In parallel, the Data Extractor retrieves vital insights via Athena from the log files stored in S3.

The output of this process is used as the input for the data enrichment process. This is responsible for consulting different threat intelligence sources that are used to further augment the analysis and determine if the event is an actual incident or not.

At this point, we build the context that will allow us not only to have greater certainty in calculating the score, but it will also help us validate and act quicker. This context includes:

  • Application’s owner
  • Affected business metrics
  • Error handling statistics of our applications
  • Reputation of IP addresses and associated users
  • Use of unexpected URL parameters
  • Distribution by origin of the traffic that generated the event (cloud providers, geolocation, etc.)
  • Known behavior patterns of vulnerability discovery or exploitation
Step 2: MELI Architecture - Anomaly detection project

Step 2: MELI Architecture – Anomaly detection project

Step 3: Incident response

Once we reconstruct the context of the event, we calculate a score for each “suspicious actor” involved.

Step 3: MELI Architecture - Anomaly detection project

Step 3: MELI Architecture – Anomaly detection project

Based on these analysis results we carry out a series of verifications in order to rule out false positives. Finally, we execute different actions based on the following criteria:

Manual review

If the outcome of the automatic analysis results in a medium risk scoring, we activate a manual review process:

  1. We send a report to the application’s owners with a summary of the context. Based on their understanding of the business, they can activate the Incident Response Team (IRT) on-call and/or provide feedback that allows us to improve our automatic rules.
  2. In parallel, our threat analysis team receives and processes the event. They are equipped with tools that allow them to add IP addresses, user-agents, referrers, or regular expressions into Amazon WAF to carry out temporary blocking of “bad actors” in situations where the attack is in progress.

Automatic response

If the analysis results in a high risk score, an automatic containment process is triggered. The event is sent to our block API, which is responsible for adding a temporary rule designed to mitigate the attack in progress. Behind the scenes, our block API leverages AWS WAF to create IPSets. We reference these IPsets from our custom rule groups in our web ACLs, in order to block IPs that source the malicious traffic. We found many benefits in the new release of AWS WAF, like support for Amazon Managed Rules, larger capacity units per web ACL as well as an easier to use API.

Conclusion

By leveraging the AWS platform and its powerful APIs, and together with the AWS WAF service team and solutions architects, we were able to build an automated incident response solution that is able to identify and block malicious actors with minimal operator intervention. Since launching the solution, we have reduced YoY application downtime over 92% even when the time under attack increased over 10x. This has had a positive impact on our users and therefore, on our business.

Not only was our downtime drastically reduced, but we also cut the number of manual interventions during this type of incident by 65%.

We plan to iterate over this solution to further reduce false positives in our detection mechanisms as well as the time to respond to external threats.

About the authors

Pablo Garbossa is an Information Security Manager at Mercado Libre. His main duties include ensuring security in the software development life cycle and managing security in MELI’s cloud environment. Pablo is also an active member of the Open Web Application Security Project® (OWASP) Buenos Aires chapter, a nonprofit foundation that works to improve the security of software.

Federico Alliani is a Security Engineer on the Mercado Libre Monitoring team. Federico and his team are in charge of protecting the site against different types of attacks. He loves to dive deep into big architectures to drive performance, scale operational efficiency, and increase the speed of detection and response to security events.

Architecting a Data Lake for Higher Education Student Analytics

Post Syndicated from Craig Jordan original https://aws.amazon.com/blogs/architecture/architecting-data-lake-for-higher-education-student-analytics/

One of the keys to identifying timely and impactful actions is having enough raw material to work with. However, this up-to-date information typically lives in the databases that sit behind several different applications. One of the first steps to finding data-driven insights is gathering that information into a single store that an analyst can use without interfering with those applications.

For years, reporting environments have relied on a data warehouse stored in a single, separate relational database management system (RDBMS). But now, due to the growing use of Software as a service (SaaS) applications and NoSQL database options, data may be stored outside the data center and in formats other than tables of rows and columns. It’s increasingly difficult to access the data these applications maintain, and a data warehouse may not be flexible enough to house the gathered information.

For these reasons, reporting teams are building data lakes, and those responsible for using data analytics at universities and colleges are no different. However, it can be challenging to know exactly how to start building this expanded data repository so it can be ready to use quickly and still expandable as future requirements are uncovered. Helping higher education institutions address these challenges is the topic of this post.

About Maryville University

Maryville University is a nationally recognized private institution located in St. Louis, Missouri, and was recently named the second fastest growing private university by The Chronicle of Higher Education. Even with its enrollment growth, the university is committed to a highly personalized education for each student, which requires reliable data that is readily available to multiple departments. University leaders want to offer the right help at the right time to students who may be having difficulty completing the first semester of their course of study. To get started, the data experts in the Office of Strategic Information and members of the IT Department needed to create a data environment to identify students needing assistance.

Critical data sources

Like most universities, Maryville’s student-related data centers around two significant sources: the student information system (SIS), which houses student profiles, course completion, and financial aid information; and the learning management system (LMS) in which students review course materials, complete assignments, and engage in online discussions with faculty and fellow students.

The first of these, the SIS, stores its data in an on-premises relational database, and for several years, a significant subset of its contents had been incorporated into the university’s data warehouse. The LMS, however, contains data that the team had not tried to bring into their data warehouse. Moreover, that data is managed by a SaaS application from Instructure, called “Canvas,” and is not directly accessible for traditional extract, transform, and load (ETL) processing. The team recognized they needed a new approach and began down the path of creating a data lake in AWS to support their analysis goals.

Getting started on the data lake

The first step the team took in building their data lake made use of an open source solution that Harvard’s IT department developed. The solution, comprised of AWS Lambda functions and Amazon Simple Storage Service (S3) buckets, is deployed using AWS CloudFormation. It enables any university that uses Canvas for their LMS to implement a solution that moves LMS data into an S3 data lake on a daily basis. The following diagram illustrates this portion of Maryville’s data lake architecture:

The data lake for the Learning Management System data

Diagram 1: The data lake for the Learning Management System data

The AWS Lambda functions invoke the LMS REST API on a daily schedule resulting in Maryville’s data, which has been previously unloaded and compressed by Canvas, to be securely stored into S3 objects. AWS Glue tables are defined to provide access to these S3 objects. Amazon Simple Notification Service (SNS) informs stakeholders the status of the data loads.

Expanding the data lake

The next step was deciding how to copy the SIS data into S3. The team decided to use the AWS Database Migration Service (DMS) to create daily snapshots of more than 2,500 tables from this database. DMS uses a source endpoint for secure access to the on-premises database instance over VPN. A target endpoint determines the specific S3 bucket into which the data should be written. A migration task defines which tables to copy from the source database along with other migration options. Finally, a replication instance, a fully managed virtual machine, runs the migration task to copy the data. With this configuration in place, the data lake architecture for SIS data looks like this:

Diagram 2: Migrating data from the Student Information System

Diagram 2: Migrating data from the Student Information System

Handling sensitive data

In building a data lake you have several options for handling sensitive data including:

  • Leaving it behind in the source system and avoid copying it through the data replication process
  • Copying it into the data lake, but taking precautions to ensure that access to it is limited to authorized staff
  • Copying it into the data lake, but applying processes to eliminate, mask, or otherwise obfuscate the data before it is made accessible to analysts and data scientists

The Maryville team decided to take the first of these approaches. Building the data lake gave them a natural opportunity to assess where this data was stored in the source system and then make changes to the source database itself to limit the number of highly sensitive data fields.

Validating the data lake

With these steps completed, the team turned to the final task, which was to validate the data lake. For this process they chose to make use of Amazon Athena, AWS Glue, and Amazon Redshift. AWS Glue provided multiple capabilities including metadata extraction, ETL, and data orchestration. Metadata extraction, completed by Glue crawlers, quickly converted the information that DMS wrote to S3 into metadata defined in the Glue data catalog. This enabled the data in S3 to be accessed using standard SQL statements interactively in Athena. Without the added cost and complexity of a database, Maryville’s data analyst was able to confirm that the data loads were completing successfully. He was also able to resolve specific issues encountered on particular tables. The SQL queries, written in Athena, could later be converted to ETL jobs in AWS Glue, where they could be triggered on a schedule to create additional data in S3. Athena and Glue enabled the ETL that was needed to transform the raw data delivered to S3 into prepared datasets necessary for existing dashboards.

Once curated datasets were created and stored in S3, the data was loaded into an AWS Redshift data warehouse, which supported direct access by tools outside of AWS using ODBC/JDBC drivers. This capability enabled Maryville’s team to further validate the data by attaching the data in Redshift to existing dashboards that were running in Maryville’s own data center. Redshift’s stored procedure language allowed the team to port some key ETL logic so that the engineering of these datasets could follow a process similar to approaches used in Maryville’s on-premises data warehouse environment.

Conclusion

The overall data lake/data warehouse architecture that the Maryville team constructed currently looks like this:

The complete architecture

Diagram 3: The complete architecture

Through this approach, Maryville’s two-person team has moved key data into position for use in a variety of workloads. The data in S3 is now readily accessible for ad hoc interactive SQL workloads in Athena, ETL jobs in Glue, and ultimately for machine learning workloads running in EC2, Lambda or Amazon Sagemaker. In addition, the S3 storage layer is easy to expand without interrupting prior workloads. At the time of this writing, the Maryville team is both beginning to use this environment for machine learning models described earlier as well as adding other data sources into the S3 layer.

Acknowledgements

The solution described in this post resulted from the collaborative effort of Christine McQuie, Data Engineer, and Josh Tepen, Cloud Engineer, at Maryville University, with guidance from Travis Berkley and Craig Jordan, AWS Solutions Architects.

Field Notes: Gaining Insights into Labeling Jobs for Machine Learning

Post Syndicated from Michael Graumann original https://aws.amazon.com/blogs/architecture/field-notes-gaining-insights-into-labeling-jobs-for-machine-learning/

In an era where more and more data is generated, it becomes critical for businesses to derive value from it. With the help of supervised learning, it is possible to generate models to automatically make predictions or decisions by leveraging historical data. For example, image recognition for self-driving cars, predicting anomalies on X-rays, fraud detection in finance and more. With supervised learning, these models learn from labeled data. The success of those models is highly dependent on readily available, high quality labeled data.

However, you might encounter cases where a high percentage of your pre-existing data is unlabeled. In these situations, providing correct labeling to previously unlabeled data points would directly translate to higher model accuracy.

Amazon SageMaker Ground Truth helps you with exactly that. It lets you build highly accurate training datasets for machine learning quickly. SageMaker Ground Truth provides your labelers with built-in workflows and interfaces for common labeling tasks. This process could take several hours or more depending on the size of your unlabeled dataset, and you might have a need to track the progress easily, preferably in the form of a dashboard.

In this blogpost we show how to gain deep insights into the progress of labeling and the performance of the workers by using Amazon Athena and Amazon QuickSight. We use Amazon Athena former to set up several views with specific insights into the labeling progress. Finally we will reference these views in Amazon QuickSight to visualize the data in a dashboard.

This approach also works for combining multiple AWS services in general. AWS provides many building blocks than you can mix-and-match to create a unique, integrated solution with cohesive insights. In this blog post we use data produced by one service (Ground Truth), prepare it with another (Athena) and visualize with a third (QuickSight). The following diagram shows this architecture.

Solution Architecture

ML Solution Architecture

Mapping a JSON structure to a table structure

Ground Truth creates several directories in your Amazon S3 output path. These directories contain the results of your labeling job and other artifacts of the job. The top-level directory for a labeling job has the same name as your labeling job, while the output directories are placed inside it. We will create all insights from what SageMaker Ground Truth calls worker responses.

All respective JSON files reside in the path s3://bucket/<job-name>/annotations/worker-response/.

To analyze the labeling data with Amazon Athena we need to understand the structure of the underlying JSON files. Let’s review the example below. For each item that was labeled, we see the label itself, followed by the submission time and a workerId pointing to an identity. This identity lives in Amazon Cognito, a fully managed service that provides the user directory for our labelers.

{
    "answers": [
        {
            "answerContent": {
                "crowd-classifier": {
                    "label": "Compute"
                }
            },
            "submissionTime": "2020-03-27T10:31:04.210Z",
            "workerId": "private.eu-west-1.1111111111111111",
            "workerMetadata": {
                "identityData": {
                    "identityProviderType": "Cognito",
                    "issuer": "https://cognito-idp.eu-west-1.amazonaws.com/eu-west-1_111111111",
                    "sub": "11111111-1111-1111-1111-111111111111"
                }
            }
        },
        ...
    ]
}

Although the data is stored in Amazon S3 object storage, we are able to use SQL to access the data by using Amazon Athena. Since we now understand the JSON structure from shown in the preceding code, we use Athena and define how to interpret the data that is relevant to us. We do so by first creating a database using the Athena Query Editor:

CREATE DATABASE analyze_labels_db;

Once inside the database, we add the table schema. The actual files remain on Amazon S3, but using the metadata catalog, Athena then knows where the data lies and how to interpret it. The AWS Glue Data Catalog is a central repository to store structural and operational metadata for all your data assets. For a given dataset, you can store its table definition, physical location, add business relevant attributes, in addition to track how this data has changed over time. Besides, Athena the AWS Glue Data Catalog also provides out-of-box integration with Amazon EMR and Amazon Redshift Spectrum. Once you add your table definitions to the Glue Data Catalog, they are available for ETL. They are also readily available for querying in Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum so that you can have a common view of your data between these services.

When going from JSON to SQL, we are crossing format boundaries. To further facilitate how to read the JSON formatted data we are using SerDe Properties to replace the hyphen in crowd-classifier with an underscore due to DDL constraints. Finally we point the location to our Amazon S3 bucket containing the single worker responses. Recognize in the following script that we translate the nested structure of the JSON file itself into a hierarchical, nested data structure in the schema definition. Also, we could leave out the workerMetadata as we don’t need it at this time. The data would still stay in the files on Amazon S3, so that we could later change and add the workerMetadata STRUCT into the table definition for our analysis.

CREATE EXTERNAL TABLE annotations_raw (
  answers array<
    struct<answercontent: 
      struct<crowd_classifier: 
        struct<label: string>
      >,
      submissionTime: string,
      workerId: string,
      workerMetadata: 
        struct<identityData: 
          struct<identityProviderType: string, issuer: string, sub: string>
        >
    >
  >
) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  "mapping.crowd_classifier"="crowd-classifier" 
) 
LOCATION 's3://<YOUR_BUCKET>/<JOB_NAME>/annotations/worker-response/'

Creating Views in Athena

Now, we have nested data in our annotations_raw table. For many use cases, especially for analytical uses, representing data in a tabular fashion—as rows—is more natural. This is also the standard way when using SQL and business intelligence tools. To unnest the hierarchical data into flattened rows, we create the following view which will serve as foundation for the other views we create. For an in-depth look into unnesting data with Amazon Athena, read this blog post.

Some of the information we’re interested in might not be part of the document, but is encoded in the path. We use a trick in Athena by using the $path variable from the Presto Hive Connector. This determines which Amazon S3 file contains data that is returned by a specific row in an Athena table. This way we can find out which data object an annotation belongs to. Since Athena is built on top of Presto, we are able to use Presto’s built-in regexp_extract function to find out the iteration as well as the data object id per labeling result. We also cast the submission time in date format to later determine the labeling progress per day.

CREATE OR REPLACE VIEW annotations_view AS
SELECT 
  regexp_extract("$path", 'iteration-[0-9]*') as iteration,
  regexp_extract("$path", '(iteration-[0-9]*\/([0-9]*))',2) as dataRecord,
  answer.answercontent.crowd_classifier.label,
  cast(from_iso8601_timestamp(answer.submissionTime) as timestamp) as submissionTime,
  cast(from_iso8601_timestamp(answer.submissionTime) as date) as submissionDay,
  answer.workerId,
  answer.workerMetadata.identityData.identityProviderType,
  answer.workerMetadata.identityData.issuer,
  answer.workerMetadata.identityData.sub,
  "$path" path
FROM 
  annotations_raw
CROSS JOIN UNNEST(answers) AS t(answer)

This view, annotations_view, will be the starting point for the other views we will be creating in further in this post.

Visualizing with QuickSight

In this section, we explore a way to visualize the views we build in Athena by pointing Amazon QuickSight to the respective view. Amazon QuickSight lets you create and publish interactive dashboards that include ML Insights. Dashboards can then be accessed from any device, and embedded into your applications, portals, and websites.

Thanks to the tight integration between Athena and QuickSight, we are able to map one dataset in QuickSight to one Athena view. In order to further optimize the performance of the dashboard, we can optionally import the datasets into the in-memory optimized calculation engine for Amazon QuickSight called SPICE. With the datasets in place we can now create an analysis in order to interact with the visuals we’re going to add. You can think of an analysis as a container for a set of related visuals. You can use multiple datasets in an analysis, although any given visual can only use one of those datasets. After you create an analysis and an initial visual, you can expand the analysis. You can do this for example by adding datasets and visuals.

Let’s start with our first insight.

Annotations per worker

We’d like to gain insights not only into the total number of labeled items but also on the level of contributions of each individual workers. This could give us an indication whether the labels were created by a diverse crowd of labelers or by a few productive ones. A largely disproportionate amount of contributions from a handful of workers who may have brought along their biases.

SageMaker Ground Truth calls labeled data objects annotations, which is the result of a single workers labeling task.

Luckily we encapsulated all the heavy lifting of format conversion in the annotations_view, so that it is now easy to create a view for the annotations per user:

CREATE OR REPLACE VIEW annotations_per_user AS
SELECT COUNT(sub) AS LabeledItems,
sub AS User
FROM annotations_view
GROUP BY sub
ORDER BY LabeledItems DESC

Next we visualize this view in QuickSight. We add a visual to our analysis, select the respective dataset for the view and use the AutoGraph feature, which chooses the most appropriate visual type. Since we already arranged our view in Athena by the number of labeled items in descending order, there is no need now to sort the data in QuickSight. In the following screenshot, worker c4ef78e4... contributed more labels compared to their peers.

Annotations per worker

This view gives you an indicator to check for a bias that the leading worker might have brought along.

Annotations per label

One thing we want to be aware of is potential imbalances between classes in our dataset. Especially simple machine learning models, which may learn to frequently predict a label that is massively over represented in the dataset. If we can identify an imbalance, we can apply mitigation actions such as upsampling data of underrepresented classes. With the following view we list the total number of annotations per label.

CREATE OR REPLACE VIEW annotations_per_label AS
SELECT Count(dataRecord) AS TotalLabels, label As Label 
FROM annotations_view
GROUP BY label
ORDER BY TotalLabels DESC, Label;

As before, we create a dataset in QuickSight pointing to the annotations_per_label view, open the analysis, add a new visual and leverage the AutoGraph functionality. The result is the following visual representation:

Annotations per worker 2

One can clearly see that the Analytics & AI/ML class is massively underrepresented. At this point, you might want to try getting more data or think about upsampling data for that class.

Annotations per day

Seeing the total number of annotations per label and per worker is good, but we are also interested in how the labeling progress changes over time. This way we might see spikes related to labeller activations. We can also or estimate how long it takes to reach a certain goal of annotations given the current pace. For this purpose we create the following view aggregating the total annotations per day.

CREATE OR REPLACE VIEW annotations_per_day AS
SELECT COUNT(datarecord) AS LabeledItems,
submissionDay
FROM annotations_view
GROUP BY submissionDay
ORDER BY submissionDay, LabeledItems DESC

This time the QuickSight AutoGraph provides us with the following line chart. You might have noticed that the axis labels do not match the column names in Athena. That is because we renamed them in QuickSight for better readability.

Total annotations per day

In the preceding chart we see that there is no consistent pace of labeling, which makes it hard to predict when a certain amount of labeled data will be reached. In this example, after starting strong the progress immediately went down. Knowing this, we might want to take action into motivating our workers to contribute more and validate the effectiveness of these actions with the help of this chart. The spikes indicate an effective short-term action.

Distribution of total annotations by user

We already have insights into annotations per worker, per label and per day. Let us now now see what insights we can get from aggregating some of this information.

The bigger your labeling workforce gets, the harder it can become to see the whole picture. For that reason we will now create a histogram consisting of five buckets. Each bucket represents an interval of total annotations (for example, 0-25 annotations) mapped to the number of users whose amount of total annotations lies in that interval. This allows us to get a sense of what kind of bias might be introduced by the majority of annotations being contributed by a small amount of workers.

To do that, we use the Presto function width_bucket which returns the number of labeled data objects according to the five buckets we defined with a size of 25 each. We define these buckets by creating an Array with 5 elements that specify the boundaries.

CREATE OR REPLACE VIEW users_per_bucket_annotations AS
SELECT 
bucket,numberOfUsers,
CASE
   WHEN bucket=5 THEN 'B' || cast(bucket AS VARCHAR(10)) || ': ' || cast(((bucket-1) * 25) AS VARCHAR(10)) || '+'
   ELSE 'B' || cast(bucket AS VARCHAR(10)) || ': ' || cast(((bucket-1) * 25) AS VARCHAR(10)) || '-' || cast((bucket * 25) AS VARCHAR(10))
END AS NumberOfAnnotations
FROM
(SELECT width_bucket(labeleditems,ARRAY[0,25,50,75,100]) AS bucket,
 count(user) AS numberOfUsers
FROM annotations_per_user
GROUP BY 1
ORDER BY bucket)

A SELECT * FROM users_per_bucket_annotations produces the following result:

A SELECT FROM users_per_bucket_annotations

Let’s now investigate the same data via QuickSight:

Annotations per User in buckets of Size 25

Now that we can look at the data visually it becomes clear that we have a bimodal distribution, with many labelers having done very little, and many labelers doing quite a lot. This may warrant interviewing some labelers to find out if there is something holding back users from progressing, or if we can keep engagement high over time.

Putting it all together in QuickSight

Since we created all previous visuals into one analysis, we can now utilize it as a central place to consume our insights in a user-friendly way. Moreover, we can share our insights with others as a read-only snapshot which QuickSight calls a dashboard. User who are dashboard viewers can view and filter the dashboard data as below:

Groundtruth dashboard

Furthermore, you can generate a report and let QuickSight send it either once or on a schedule (daily, weekly or monthly) to your peers. This way users do not have to sign in and they can get reminders to check the progress of the labeling job. Lastly, sending out those reports is an opportunity to stay in touch with the labelers and keep the engagement high.

Conclusion

In this blogpost, we have shown one example of combining multiple AWS services in order to build a solution tailored to your needs. We took the Amazon S3 output generated by SageMaker Ground Truth and showed how it can be further processed and analyzed with Athena. Finally, we created a central place to consume our insights in a user-friendly way with QuickSight. By putting it all together in a dashboard we were able to share our insights with our peers.

You can take the same pattern and apply it to other situations: take some of the many building blocks AWS provides and mix-and-match them to create a unique, integrated solution with cohesive insights just as we did with Ground Truth, Athena, and QuickSight.

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

Automating bucketing of streaming data using Amazon Athena and AWS Lambda

Post Syndicated from Ahmed Saef Zamzam original https://aws.amazon.com/blogs/big-data/automating-bucketing-of-streaming-data-using-amazon-athena-and-aws-lambda/

In today’s world, data plays a vital role in helping businesses understand and improve their processes and services to reduce cost. You can use several tools to gain insights from your data, such as Amazon Kinesis Data Analytics or open-source frameworks like Structured Streaming and Apache Flink to analyze the data in real time. Alternatively, you can batch analyze the data by ingesting it into a centralized storage known as a data lake. Data lakes allow you to import any amount of data that can come in real time or batch. With Amazon Simple Storage Service (Amazon S3), you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% (11 9s) of durability.

After the data lands in your data lake, you can start processing this data using any Big Data processing tool of your choice. Amazon Athena is a fully managed interactive query service that enables you to analyze data stored in an Amazon S3-based data lake using standard SQL. You can also integrate Athena with Amazon QuickSight for easy visualization of the data.

When working with Athena, you can employ a few best practices to reduce cost and improve performance. Converting to columnar formats, partitioning, and bucketing your data are some of the best practices outlined in Top 10 Performance Tuning Tips for Amazon Athena. Bucketing is a technique that groups data based on specific columns together within a single partition. These columns are known as bucket keys. By grouping related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thus improving query performance and reducing cost. For example, imagine collecting and storing clickstream data. If you frequently filter or aggregate by user ID, then within a single partition it’s better to store all rows for the same user together. If user data isn’t stored together, then Athena has to scan multiple files to retrieve the user’s records. This leads to more files being scanned, and therefore, an increase in query runtime and cost.

Like partitioning, columns that are frequently used to filter the data are good candidates for bucketing. However, unlike partitioning, with bucketing it’s better to use columns with high cardinality as a bucketing key. For example, Year and Month columns are good candidates for partition keys, whereas userID and sensorID are good examples of bucket keys. By doing this, you make sure that all buckets have a similar number of rows. For more information, see Bucketing vs Partitioning.

For real-time data (such as data coming from sensors or clickstream data), streaming tools like Amazon Kinesis Data Firehose can convert the data to columnar formats and partition it while writing to Amazon S3. With Kafka, you can do the same thing with connectors. But what about bucketing? This post shows how to continuously bucket streaming data using AWS Lambda and Athena.

Overview of solution

The following diagram shows the high-level architecture of the solution.

The architecture includes the following steps:

  1. We use the Amazon Kinesis Data Generator (KDG) to simulate streaming data. Data is then written into Kinesis Data Firehose; a fully managed service that enables you to load streaming data to an Amazon S3-based data lake.
  2. Kinesis Data Firehose partitions the data by hour and writes new JSON files into the current partition in a /raw Each new partition looks like /raw/dt=<YYYY-MM-dd-HH>. Every hour, a new partition is created.
  3. Two Lambda functions are triggered on an hourly basis based on Amazon CloudWatch Events.
    • Function 1 (LoadPartition) runs every hour to load new /raw partitions to Athena SourceTable, which points to the /raw prefix.
    • Function 2 (Bucketing) runs the Athena CREATE TABLE AS SELECT (CTAS) query.
  4. The CTAS query copies the previous hour’s data from /raw to /curated and buckets the data while doing so. It loads the new data as a new partition to TargetTable, which points to the /curated prefix.

Overview of walkthrough

In this post, we cover the following high-level steps:

  1. Install and configure the KDG.
  2. Create a Kinesis Data Firehose delivery stream.
  3. Create the database and tables in Athena.
  4. Create the Lambda functions and schedule them.
  5. Test the solution.
  6. Create view that the combines data from both tables.
  7. Clean up.

Installing and configuring the KDG

First, we need to install and configure the KDG in our AWS account. To do this, we use the following AWS CloudFormation template.

For more information about installing the KDG, see the KDG Guide in GitHub.

To configure the KDG, complete the following steps:

  1. On the AWS CloudFormation console, locate the stack you just created.
  2. On the Outputs tab, record the value for KinesisDataGeneratorUrl.
  3. Log in to the KDG main page using the credentials created when you deployed the CloudFormation template.
  4. In the Record template section, enter the following template. Each record has three fields: sensorID, currentTemperature, and status.
    {
        "sensorId": {{random.number(4000)}},
        "currentTemperature": {{random.number(
            {
                "min":10,
                "max":50
            }
        )}},
        "status": "{{random.arrayElement(
            ["OK","FAIL","WARN"]
        )}}"
    }
    

  5. Choose Test template.

The result should look like the following screenshot.

We don’t start sending data now; we do this after creating all other resources.

Creating a Kinesis Data Firehose delivery stream

Next, we create the Kinesis Data Firehose delivery stream that is used to load the data to the S3 bucket.

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose.
  2. Choose Create delivery stream.
  3. For Delivery stream name, enter a name, such as AutoBucketingKDF.
  4. For Source, select Direct PUT or other sources.
  5. Leave all other settings at their default and choose Next.
  6. On Process Records page, leave everything at its default and choose Next.
  7. Choose Amazon S3 as the destination and choose your S3 bucket from the drop-down menu (or create a new one). For this post, I already have a bucket created.
  8. For S3 Prefix, enter the following prefix:
    raw/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}-!{timestamp:HH}/

We use custom prefixes to tell Kinesis Data Firehose to create a new partition every hour. Each partition looks like this: dt=YYYY-MM-dd-HH. This partition-naming convention conforms to the Hive partition-naming convention, <PartitionKey>=<PartitionKey>. In this case, <PartitionKey> is dt and <PartitionValue> is YYYY-MM-dd-HH. By doing this, we implement a flat partitioning model instead of hierarchical (year=YYYY/month=MM/day=dd/hour=HH) partitions. This model can be much simpler for end-users to work with, and you can use a single column (dt) to filter the data. For more information on flat vs. hierarchal partitions, see Data Lake Storage Foundation on GitHub.

  1. For S3 error prefix, enter the following code:
    myFirehoseFailures/!{firehose:error-output-type}/

  2. On the Settings page, leave everything at its default.
  3. Choose Create delivery stream.

Creating an Athena database and tables

In this solution, the Athena database has two tables: SourceTable and TargetTable. Both tables have identical schemas and will have the same data eventually. However, each table points to a different S3 location. Moreover, because data is stored in different formats, Athena uses a different SerDe for each table to parse the data. SourceTable uses JSON SerDe and TargetTable uses Parquet SerDe. One other difference is that SourceTable’s data isn’t bucketed, whereas TargetTable’s data is bucketed.

In this step, we create both tables and the database that groups them.

  1. On the Athena console, create a new database by running the following statement:
    CREATE DATABASE mydatabase

  2. Choose the database that was created and run the following query to create SourceTable. Replace <s3_bucket_name> with the bucket name you used when creating the Kinesis Data Firehose delivery stream.
    CREATE EXTERNAL TABLE mydatabase.SourceTable(
      sensorid string, 
      currenttemperature int, 
      status string)
    PARTITIONED BY ( 
      dt string)
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://<s3_bucket_name>/raw/'
    

  3. Run the following CTAS statement to create TargetTable:
    CREATE TABLE TargetTable
    WITH (
          format = 'PARQUET', 
          external_location = 's3://<s3_bucket_name>/curated/', 
          partitioned_by = ARRAY['dt'], 
          bucketed_by = ARRAY['sensorID'], 
          bucket_count = 3) 
    AS SELECT *
    FROM SourceTable

SourceTable doesn’t have any data yet. However, the preceding query creates the table definition in the Data Catalog. We configured this data to be bucketed by sensorID (bucketing key) with a bucket count of 3. Ideally, the number of buckets should be so that the files are of optimal size.

Creating Lambda functions

The solution has two Lambda functions: LoadPartiton and Bucketing. We use an AWS Serverless Application Model (AWS SAM) template to create, deploy, and schedule both functions.

Follow the instructions in the GitHub repo to deploy the template. When deploying the template, it asks you for some parameters. You can use the default parameters, but you have to change S3BucketName and AthenaResultLocation. For more information, see Parameter Details in the GitHub repo.

LoadPartition function

The LoadPartiton function is scheduled to run the first minute of every hour. Every time Kinesis Data Firehose creates a new partition in the /raw folder, this function loads the new partition to the SourceTable. This is crucial because the second function (Bucketing) reads this partition the following hour to copy the data to /curated.

Bucketing function

The Bucketing function is scheduled to run the first minute of every hour. It copies the last hour’s data from SourceTable to TargetTable. It does so by creating a tempTable using a CTAS query. This tempTable points to the new date-hour folder under /curated; this folder is then added as a single partition to TargetTable.

To implement this, the function runs three queries sequentially. The queries use two parameters:

  • <s3_bucket_name> – Defined by an AWS SAM parameter and should be the same bucket used throughout this solution
  • <last_hour_partition> – Is calculated by the function depending on which hour it’s running

The function first creates TempTable as the result of a SELECT statement from SourceTable. It stores the results in a new folder under /curated. The results are bucketed and stored in Parquet format. See the following code:

CREATE TABLE TempTable
    WITH (
      format = 'PARQUET', 
      external_location = 's3://<s3_bucket_name>/curated/dt=<last_hour_partition>/', 
      bucketed_by = ARRAY['sensorID'], 
      bucket_count = 3) 
    AS SELECT *
    FROM SourceTable
    WHERE dt='<last_hour_partiton>';

We create a new subfolder in /curated, which is new partition for TargetTable. So, after the TempTable creation is complete, we load the new partition to TargetTable:

ALTER TABLE TargetTable
                ADD IF NOT EXISTS
                PARTITION ('<last_hour_partiton>');

Finally, we delete tempTable from the Data Catalog:

DROP TABLE TempTable

Testing the solution

Now that we have created all resources, it’s time to test the solution. We start by generating data from the KDG and waiting for an hour to start querying data in TargetTable (the bucketed table).

  1. Log in to the KDG. You should find the template you created earlier. For the configuration, choose the following:
    1. The Region used.
    2. For the delivery stream, choose the Kinesis Data Firehose you created earlier.
    3. For records/sec, enter 3000.
  2. Choose Send data.

The KDG starts sending simulated data to Kinesis Data Firehose. After 1 minute, a new partition should be created in Amazon S3.

The Lambda function that loads the partition to SourceTable runs on the first minute of the hour. If you started sending data after the first minute, this partition is missed because the next run loads the next hour’s partition, not this one. To mitigate this, run MSCK REPAIR TABLE SourceTable only for the first hour.

  1. To benchmark the performance between both tables, wait for an hour so that the data is available for querying in TargetTable.
  2. When the data is available, choose one sensorID and run the following query on SourceTable and TargetTable.
    SELECT sensorID, avg(currenttemperature) as AverageTempreture 
    FROM <TableName>
    WHERE dt='<YYYY-MM-dd-HH>' AND sensorID ='<sensorID_selected>'
    GROUP BY 1

The following screenshot shows the query results for SourceTable. It shows the runtime in seconds and amount of data scanned.

The following screenshot shows the query results for TargetTable.

If you look at these results, you don’t see a huge difference in runtime for this specific query and dataset; for other datasets, this difference should be more significant. However, from a data scanning perspective, after bucketing the data, we reduced the data scanned by approximately 98%. Therefore, for this specific use case, bucketing the data lead to a 98% reduction in Athena costs because you’re charged based on the amount of data scanned by each query.

Querying the current hour’s data

Data for the current hour isn’t available immediately in TargetTable. It’s available for querying after the first minute of the following hour. To query this data immediately, we have to create a view that UNIONS the previous hour’s data from TargetTable with the current hour’s data from SourceTable. If data is required for analysis after an hour of its arrival, then you don’t need to create this view.

To create this view, run the following query in Athena:

CREATE OR REPLACE VIEW combined AS

SELECT *, "$path" AS file
FROM SourceTable
WHERE dt >= date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

UNION ALL 

SELECT *, "$path" AS file
FROM TargetTable
WHERE dt < date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

Cleaning up

Delete the resources you created if you no longer need them.

  1. Delete the Kinesis Data Firehose delivery stream.
  2. In Athena, run the following statements
    1. DROP DATABASE mydatabase
    2. DROP TABLE SourceTable
    3. DROP TABLE TargetTable
  3. Delete the AWS SAM template to delete the Lambda functions.
  4. Delete the CloudFormation stack for the KDG. For more information, see Deleting a stack on the AWS CloudFormation console.

Conclusion

Bucketing is a powerful technique and can significantly improve performance and reduce Athena costs. In this post, we saw how to continuously bucket streaming data using Lambda and Athena. We used a simulated dataset generated by Kinesis Data Generator. The same solution can apply to any production data, with the following changes:

  • DDL statements
  • Functions used can work with data that is partitioned by hour with the partition key ‘dt’ and partition value <YYYY-MM-dd-HH>. If your data is partitioned in a different way, edit the Lambda functions accordingly.
  • Frequency of Lambda triggers.

About the Author

Ahmed Zamzam is a Solutions Architect with Amazon Web Services. He supports SMB customers in the UK in their digital transformation and their cloud journey to AWS, and specializes in Data Analytics. Outside of work, he loves traveling, hiking, and cycling.

 

 

 

 

Configure and optimize performance of Amazon Athena federation with Amazon Redshift

Post Syndicated from Harsha Tadiparthi original https://aws.amazon.com/blogs/big-data/configure-and-optimize-performance-of-amazon-athena-federation-with-amazon-redshift/

This post provides guidance on how to configure Amazon Athena federation with AWS Lambda and Amazon Redshift, while addressing performance considerations to ensure proper use.

If you use data lakes in Amazon Simple Storage Service (Amazon S3) and use Amazon Redshift as your data warehouse, you may want to integrate the two for a lake house approach. Lake House is the ability to integrate Data Lake and Data warehouse seamlessly. When you need to query your data lake from your Amazon Redshift Data warehouse, you can use Amazon Redshift Spectrum, which works great in unifying your data lake and data warehouse. However, when you use Athena in the data lake and need to access data in Amazon Redshift for the following two scenarios which are commonly seen, there is no easy approach:

  • Team A has a data lake in Amazon S3 and uses Athena. They need access to the data in an Amazon Redshift cluster owned by Team B.
  • Analysts using Athena to query their data lake for analytics need agility and flexibility to access data in an Amazon Redshift data warehouse without moving the data to Amazon S3 Data Lake.

In these scenarios, Athena federation with Amazon Redshift allows you to seamlessly access the data in your Amazon Redshift data warehouse without having to wait to unload the data to the Amazon S3 data lake, which removes the overhead in managing such jobs.

In this post, you walk through a step-by-step configuration to set up Athena federation using Lambda to access data in Amazon Redshift. You also see a performance benchmark analysis of interactive and ad hoc TPC-DS queries, and learn some key performance considerations and best practices when using federation.

Solution overview

Data federation is the capability to integrate data in another data store using a single interface. The following diagram depicts how Athena federation works by using Lambda to integrate with a federated data source.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.

Lambda lets you run code without provisioning or managing servers. You can run code for virtually any type of application with zero administration and only pay for when the code is running.

Amazon Redshift is a petabyte-scale data warehouse designed from the ground up, natively for the cloud. Amazon Redshift is the most popular and fastest cloud data warehouse. It’s integrated with your data lake, offers performance up to three times faster than any other data warehouse, and costs up to 75% less than any other cloud data warehouse.

The following diagram depicts all the data source connectors available as of this writing in the AWS Serverless Application Repository.

The AWS Serverless Application Repository is a managed repository for serverless applications. It enables you to store and share reusable applications, and easily assemble and deploy serverless architectures in powerful new ways.

You can also create a custom connector for sources that aren’t in the AWS Serverless Application Repository.

Prerequisites

Before you get started, create a secret for the Amazon Redshift login ID and password using AWS Secrets Manager.

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. Choose credentials for your Amazon Redshift cluster, and set your user name and password.
  4. Choose the cluster you want to use.
  5. For Secret name, enter a name for your secret. Use the prefix AthenaJDBCFederation so it’s easy to find.
  6. Leave the remaining fields at their defaults and choose Next.
  7. Complete your secret creation.

Setting up your S3 bucket

On the Amazon S3 console, create a new S3 bucket and subfolder for Lambda to use. For this post, use the name myworkspace0009/athenafederation.

Configuring Athena federation with Amazon Redshift

To configure Athena federation with Amazon Redshift, complete the following steps:

  1. On the AWS Serverless Application Repository, choose Available applications.
  2. In the search field, enter athena federation.

  1. Choose
  2. In the Application settings section, provide the following details:
  3. Application nameAthenaRedshiftConnector
  4. SecretNamePrefixAthenaJdbcFederation
  5. SpillBucketmyworkspace0009/athenafederation
  6. JDBCConnectorConfigRedshift://jdbc:Redshift://<YourAmazon Redshift1Hostname>:5439/<DBName>?user=sample2&password=sample2
  7. DisableSpillEncyption – False
  8. LambdaFunctionNamerstpcds30
  9. SecurityGroupID – Security group ID where Amazon Redshift is deployed
  10. SpillPrefix – Leave default
  11. Subnetids – Use the subnets where Amazon Redshift is running with comma separation
  12. Select the I acknowledge check box.
  13. Choose Deploy.

In the next steps, you configure an Amazon Virtual Private Cloud (Amazon VPC) endpoint for Amazon S3 to allow Lambda to write federated query results to Amazon S3.

  1. On the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. Choose the VPC for your endpoint.

  1. Make any necessary security changes as per your security requirements.

  1. Choose Create endpoint.

Running federated queries with Athena

To start running federated queries, complete the following steps:

  1. On the Athena console, choose Workgroups.
  2. If you don’t see a workgroup called AmazonAthenaPreviewFunctionality, create one.

When this feature becomes generally available, you won’t need to use this workgroup name.

  1. Run your queries, using lambda:rstpcds30 to run against tables in Amazon Redshift.

Athena query performance comparison

Several customers have asked us for performance insights and prescriptive guidance on how queries in Athena compare against federated queries and how to use them. In this section, we use a TPC-DS 3 TB standard dataset and a select few queries that fall in the category of ad hoc and interactive. The comparison of their performance should give you an idea of what to expect when running federated queries against Amazon Redshift.

For the following tests, we used a 3 TB TPC-DS dataset in Amazon S3 data lake with Parquet compressed, partitioned and served by Athena, and the same 3 TB TPC-DS dataset on Amazon Redshift cluster running four RA3.4XL nodes.

The following table summarizes the dataset sizes:

DatasetTable Size (Records)
store_sales8.6 billion
customer30 million
customer_address15 million
customer_demographics1.92 million
item360,000
date_dim73,000
store1,350

We ran the following four tests:

  • T1 – Queries ran in Athena without federation. All table data is in Amazon S3.
  • T2 – Queries ran in Athena with federation to Amazon Redshift. All table data is in Amazon S3, except the store_sales fact table in Amazon Redshift.
  • T3 – Queries ran in Athena with federation to Amazon Redshift. All tables and data are in Redshift.
  • T4 – Queries ran in Amazon Redshift without federation. All tables and data are in Redshift.

The following graph represents the performance of some of the ad hoc and interactive TPC-DS queries.

In the preceding graph, all T3 queries timed out at 900 seconds, depicted by the pink reference line, due to the Lambda 900-second timeout limit. This is due to overhead from store_sales fact data that needed to be transferred back to Athena.

The following graph removes T3 from the visualization, which gives better visibility when comparing the other tests.

Notice the query performance between T1 and T2 that completed in almost the same time while T4 queries ran significantly faster.

Amazon Redshift beats the performance of Athena in providing extremely low latency and should be the tool of choice if you’re looking for very low SLAs for analytics queries that Athena can’t achieve.

The following graph shows the data scanned in Amazon S3 for T1 and T2, which outlines why there isn’t much difference in query performance when compared to federated queries.

For the T2 federated queries, a small amount of dimension data is filtered in Amazon Redshift and brought back to Athena, instead of scanning the entire dimension tables. This is a typical nature for several ad hoc and interactive queries.

The performance of these TPC-DS queries between T1 and T2 is comparable because very little data is transferred back to Athena. You can see a similar behavior in several ad hoc and interactive query use cases because they use limited dimensions and scan a small subset of dimension data. Due to the 900-second timeout for the Lambda instances that connect to Amazon Redshift, it’s advised to minimize the amount of data the query brings back. Although Athena uses multiple Lambda instances in parallel to run your federated query, it’s also important to make sure the Amazon Redshift WLM queue has enough slots to process it, thereby not leading to queue wait time. For example, in some of the preceding queries, 20 Lambda executions were connecting to Amazon Redshift concurrently.

Key performance best practice considerations

When considering Athena federation with Amazon Redshift, you could take into account the following best practices:

  • Athena federation works great for queries with predicate filtering because the predicates are pushed down to Amazon Redshift. Use filter and limited-range scans in your queries to avoid full table scans.
  • If your SQL query requires returning a large volume of data from Amazon Redshift to Athena (which could lead to query timeouts or slow performance), unload the large tables in your query from Redshift to your Amazon S3 data lake.
  • Star schema is a commonly used data model in Amazon Redshift. In the star schema model, unload your large fact tables into your data lake and leave the dimension tables in Amazon Redshift. If large dimension tables are contributing to slow performance or query timeouts, unload those tables to your data lake.
  • When you run federated queries, Athena spins up multiple Lambda functions, which causes a spike in database connections. It’s important to monitor the Amazon Redshift WLM queue slots to ensure there is no queuing. Additionally, you can use concurrency scaling on your Amazon Redshift cluster to benefit from concurrent connections to queue up.

Conclusion

In this post, you learned how to configure and use Athena federation with Amazon Redshift using Lambda. Now you don’t need to wait for all the data in your Amazon Redshift data warehouse to be unloaded to Amazon S3 and maintained on a day-to-day basis to run your queries. You can use the best practice considerations outlined in the post to minimize the data transferred from Amazon Redshift for better performance. When queries are well written for federation, the performance penalties are negligible, as observed in the TPC-DS benchmark queries in this post. Happy query federating!

 


About the Author

Harsha Tadiparthi is a Specialist Sr. Solutions Architect, AWS Analytics. He enjoys solving complex customer problems in Databases and Analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

 

 

Enhancing customer safety by leveraging the scalable, secure, and cost-optimized Toyota Connected Data Lake

Post Syndicated from Sandeep Kulkarni original https://aws.amazon.com/blogs/big-data/enhancing-customer-safety-by-leveraging-the-scalable-secure-and-cost-optimized-toyota-connected-data-lake/

Toyota Motor Corporation (TMC), a global automotive manufacturer, has made “connected cars” a core priority as part of its broader transformation from an auto company to a mobility company. In recent years, TMC and its affiliate technology and big data company, Toyota Connected, have developed an array of new technologies to provide connected services that enhance customer safety and the vehicle ownership experience. Today, Toyota’s connected cars come standard with an on-board Data Communication Module (DCM) that links to a Controller Area Network (CAN). By using this hardware, Toyota provides various connected services to its customers.

Some of the connected services help drivers to safely enjoy their cars. Telemetry data is available from the car 24×7, and Toyota makes the data available to its dealers (when their customers opt-in for data sharing). For instance, a vehicle’s auxiliary battery voltage declines over time. With this data, dealership staff can proactively contact customers to recommend a charge prior to experiencing any issues. This automotive telemetry can also help fleet management companies monitor vehicle diagnostics, perform preventive maintenance and help avoid breakdowns.

There are other services such as usage-based auto insurance that leverage driving behavior data that can help safe drivers receive discounts on their car insurance. Telemetry plays a vital role in understanding driver behavior. If drivers choose to opt-in, a safety score can be generated based on their driving data and drivers can use their smartphones to check their safe driving scores.

A vehicle generates data every second, which can be bundled into larger packets at one-minute intervals. With millions of connected cars that have data points available every second, the incredible scale required to capture and store that data is immense—there are billions of messages daily generating petabytes of data. To make this vision a reality, Toyota Connected’s Mobility Team embarked on building a real-time “Toyota Connected Data Lake.” Given the scale, we leveraged AWS to build this platform. In this post, we show how we built the data lake and how we provide significant value to our customers.

Overview

The guiding principles for architecture and design that we used are as follows:

  • Serverless: We want to use cloud native technologies and spend minimal time on infrastructure maintenance.
  • Rapid speed to market: We work backwards from customer requirements and iterate frequently to develop minimally viable products (MVPs).
  • Cost-efficient at scale.
  • Low latency: near real time processing.

Our data lake needed to be able to:

  • Capture and store new data (relational and non-relational) at petabyte scale in real time.
  • Provide analytics that go beyond batch reporting and incorporate real time and predictive capabilities.
  • Democratize access to data in a secure and governed way, allowing our team to unleash their creative energy and deliver innovative solutions.

The following diagram shows the high-level architecture

Walkthrough

We built the serverless data lake with Amazon S3 as the primary data store, given the scalability and high availability of S3. The entire process is automated, which reduces the likelihood of human error, increases efficiency, and ensures consistent configurations over time, as well as reduces the cost of operations.

The key components of a data lake include Ingest, Decode, Transform, Analyze, and Consume:

  • IngestConnected vehicles send telemetry data once a minute—which includes speed, acceleration, turns, geo location, fuel level, and diagnostic error codes. This data is ingested into Amazon Kinesis Data Streams, processed through AWS Lambda to make it readable, and the “raw copy” is saved through Amazon Kinesis Data Firehose into an S3
  • Decode:  Data arriving into the Kinesis data stream in the ‘Decode’ pillar is decoded by a serverless Lambda function, which does most of the heavy lifting. Based upon a proprietary specification, this Lambda function does the bit-by-bit decoding of the input message to capture the particular sensor values. The small input payload of 35KB with data from over 180 sensors is now decoded and converted to a JSON message of 3 MB. This is then compressed and written to the ‘Decoded S3 bucket’.
  • Transform The aggregation jobs leverage the massively parallel capability of Amazon EMR, decrypt the decoded messages and convert the data to Apache Parquet Apache Parquet is a columnar storage file format designed for querying large amounts of data, regardless of the data processing framework, or programming language. Parquet allows for better compression, which reduces the amount of storage required. It also reduces I/O, since we can efficiently scan the data. The data sets are now available for analytics purposes, partitioned by masked identification numbers as well as by automotive models and dispatch type. A separate set of jobs transform the data and store it in Amazon DynamoDB to be consumed in real time from APIs.
  • ConsumeApplications needing to consume the data make API calls through the Amazon API Gateway. Authentication to the API calls is based on temporary tokens issued by Amazon Cognito.
  • AnalyzeData analytics can be directly performed off Amazon S3 by leveraging serverless Amazon Athena. Data access is democratized and made available to data science groups, who build and test various models that provide value to our customers.

Additionally, comprehensive monitoring is set up by leveraging Amazon CloudWatch, Amazon ES, and AWS KMS for managing the keys securely.

Scalability

The scalability capabilities of the building blocks in our architecture that allow us to reach this massive scale are:

  • S3: S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. S3 partitions the index based on key name. To maximize performance of high-concurrency operations on S3, we introduced randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.
  • Lambda: We can run as many concurrent functions as needed and can raise limits as required with AWS support.
  • Kinesis Firehose: It scales elastically based on volume without requiring any human intervention. We batch requests up to 128MiB or 15 minutes, whichever comes earlier to avoid small files. Additional details are available in Srikanth Kodali’s blog post.
  • Kinesis Data Streams: We developed an automated program that adjusts the shards based on incoming volume. This is based on the Kinesis Scaling Utility from AWS Labs, which allows us to scale in a way similar to EC2 Auto Scaling groups.
  • API Gateway: automatically scales to billions of requests and seamlessly handles our API traffic.
  • EMR cluster: We can programmatically scale out to hundreds of nodes based on our volume and scale in after processing is completed.

Our volumes have increased seven-fold since we migrated to AWS and we have only adjusted the number of shards in Kinesis Data Streams and the number of core nodes for EMR processing to scale with the volume.

Security in the AWS cloud

AWS provides a robust suite of security services, allowing us to have a higher level of security in the AWS cloud. Consistent with our security guidelines, data is encrypted both in transit and at rest. Additionally, we use VPC Endpoints, allowing us to keep traffic within the AWS network.

Data protection in transit:

Data protection at rest:

  • S3 server-side encryption handles all encryption, decryption and key management transparently. All user data stored in DynamoDB is fully encrypted at rest, for which we use an AWS-owned customer master key at no additional charge. Server-side encryption for Kinesis Data streams and Kinesis Data Firehose is also enabled to ensure that data is encrypted at rest.

Cost optimization

Given our very large data volumes, we were methodical about optimizing costs across all components of the infrastructure. The ultimate goal was to figure out the cost of the APIs we were exposing. We developed a robust cost model validated with performance testing at production volumes:

  • NAT gateway: When we started this project, one of the significant cost drivers was traffic flowing from Lambda to Kinesis Data Firehose that went over the NAT gateway, since Kinesis Data Firehose did not have a VPC endpoint. Traffic flowing through the NAT gateway costs $0.045/GB, whereas traffic flowing through the VPC endpoint costs $0.01/GB. Based on a product feature request from Toyota, AWS implemented this feature (VPC Endpoint for Firehose) early this year. We implemented this feature, which resulted in a four-and-a-half-fold reduction in our costs for data transfer.
  • Kinesis Data Firehose: Since Kinesis Data Firehose did not support encryption of data at rest initially, we had to use client-side encryption using KMS–this was the second significant cost driver. We requested a feature for native server-side encryption in Kinesis Data Firehose. This was released earlier this year and we enabled server-side encryption on the Kinesis Data Firehose stream. This removed the Key Management Service (KMS), resulting in another 10% reduction in our total costs.

Since Kinesis Data Firehose charges based on the amount of data ingested ($0.029/GB), our Lambda function compresses the data before writing to Kinesis Data Firehose, which saves on the ingestion cost.

  • S3– We use lifecycle policies to move data from S3 (which costs $0.023/GB) to Amazon S3 Glacier (which costs $0.004/GB) after a specified duration. Glacier provides a six-fold cost reduction over S3. We further plan to move the data from Glacier to Amazon S3 Glacier Deep Archive (which costs $0.00099/GB), which will provide us a four-fold reduction over Glacier costs. Additionally, we have set up automated deletes of certain data sets at periodic intervals.
  • EMR– We were planning to use AWS Glue and keep the architecture serverless, but made the decision to leverage EMR from a cost perspective. We leveraged spot instances for transformation jobs in EMR, which can provide up to 60% savings. The hourly jobs complete successfully with spot instances, however the nightly aggregation jobs leveraging r5.4xlarge instances failed frequently as sufficient spot capacity was not available. We decided to move to “on-demand” instances, while we finalize our strategy for “reserved instances” to reduce costs.
  • DynamoDB: Time to Live (TTL) for DynamoDB lets us define when items in a table expire so that they can be automatically deleted from the database. We enabled TTL to expire objects that are not needed after a certain duration. We plan to use reserved capacity for read and write control units to reduce costs. We also use DynamoDB auto scaling ,which helps us manage capacity efficiently, and lower the cost of our workloads because they have a predictable traffic pattern. In Q2 of 2019, DynamoDBremoved the associated costs of DynamoDB Streams used in replicating data globally, which translated to extra cost savings in global tables.
  • Amazon DynamoDB Accelerator(DAX):  Our DynamoDB tables are front-ended by DAX, which improves the response time of our application by dramatically reducing read latency, as compared to using DynamoDB. Using DAX, we also lower the cost of DynamoDB by reducing the amount of provisioned read throughput needed for read-heavy applications.
  • Lambda: We ran benchmarks to arrive at the optimal memory configuration for Lambda functions. Memory allocation in Lambda determines CPU allocation and for some of our Lambda functions, we allocated higher memory, which results in faster execution, thereby reducing the amount of GB-seconds per function execution, which saves time and cost. Using DynamoDB Accelerator (DAX) from  Lambda has several benefits for serverless applications that also use DynamoDB. DAX can improve the response time of your application by dramatically reducing read latency, as compared to using DynamoDB. For serverless applications, combining Lambda with DAX provides an additional benefit: Lower latency results in shorter execution times, which means lower costs for Lambda.
  • Kinesis Data Streams: We scale our streams through an automated job, since our traffic patterns are fairly predictable. During peak hours we add additional shards and delete them during the off-peak hours, thus allowing us to reduce costs when shards are not in use

Enhancing customer safety

The Data Lake presents multiple opportunities to enhance customer safety. Early detection of market defects and pinpointing of target vehicles affected by those defects is made possible through the telemetry data ingested from the vehicles. This early detection leads to early resolution way before the customer is affected. On-board software in the automobiles can be constantly updated over-the-air (OTA), thereby saving time and costs. The automobile can generate a Health Check Report based on the driving style of its drivers, which can create the ideal maintenance plan for drivers for worry-free driving.

The driving data for an individual driver based on speed, sharp turns, rapid acceleration, and sudden braking can be converted into a “driver score” which ranges from 1 to 100 in value. The higher the driver-score, the safer the driver. Drivers can view their scores on mobile devices and monitor the specific locations of harsh driving on the journey map. They can then use this input to self-correct and modify their driving habits to improve their scores, which will not only result in a safer environment but drivers could also get lower insurance rates from insurance companies. This also gives parents an opportunity to monitor the scores for their teenage drivers and coach them appropriately on safe driving habits. Additionally, notifications can be generated if the teenage driver exceeds an agreed-upon speed or leaves a specific area.

Summary

The automated serverless data lake is a robust scalable platform that allows us to analyze data as it becomes available in real time. From an operations perspective, our costs are down significantly. Several aggregation jobs that took 15+ hours to run, now finish in 1/40th of the time. We are impressed with the reliability of the platform that we built. The architectural decision to go serverless has reduced operational burden and will also allow us to have a good handle on our costs going forward. Additionally, we can deploy this pipeline in other geographies with smaller volumes and only pay for what we consume.

Our team accomplished this ambitious development in a short span of six months. They worked in an agile, iterative fashion and continued to deliver robust MVPs to our business partners. Working with the service teams at AWS on product feature requests and seeing them come to fruition in a very short time frame has been a rewarding experience and we look forward to the continued partnership on additional requests.

 


About the Authors


Sandeep Kulkarni is an enterprise architect at AWS. His passion is to accelerate digital transformation for customers and build highly scalable and cost-effective solutions in the cloud. In his spare time, he loves to do yoga and gardening.

 

 

 

 

Shravanthi Denthumdas is the director of mobility services at Toyota Connected.Her team is responsible for building the Data Lake and delivering services that allow drivers to safely enjoy their cars. In her spare time, she likes to spend time with her family and children.

 

 

 

 

Optimize Python ETL by extending Pandas with AWS Data Wrangler

Post Syndicated from Satoshi Kuramitsu original https://aws.amazon.com/blogs/big-data/optimize-python-etl-by-extending-pandas-with-aws-data-wrangler/

Developing extract, transform, and load (ETL) data pipelines is one of the most time-consuming steps to keep data lakes, data warehouses, and databases up to date and ready to provide business insights. You can categorize these pipelines into distributed and non-distributed, and the choice of one or the other depends on the amount of data you need to process.

Apache Spark is widely used to build distributed pipelines, whereas Pandas is preferred for lightweight, non-distributed pipelines. With the second use case in mind, the AWS Professional Service team created AWS Data Wrangler, aiming to fill the integration gap between Pandas and several AWS services, such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, AWS Glue, Amazon Athena, Amazon Aurora, Amazon QuickSight, and Amazon CloudWatch Log Insights.

AWS Data Wrangler is an open-source Python library that enables you to focus on the transformation step of ETL by using familiar Pandas transformation commands and relying on abstracted functions to handle the extraction and load steps.

You can use AWS Data Wrangler in different environments on AWS and on premises (for more information, see Install). This post focuses on data preparation for a data science project on Jupyter. By the end of this walkthrough, you will be able to set up AWS Data Wrangler on your Amazon SageMaker notebook.

Use case overview

In the following walkthrough, you use data stored in the NOAA public S3 bucket. For more information, see NOAA Global Historical Climatology Network Daily. The objective is to convert 10 CSV files (approximately 240 MB total) to a partitioned Parquet dataset, store its related metadata into the AWS Glue Data Catalog, and query the data using Athena to create a data analysis.

Configuring Amazon S3

Your first step is to create an S3 bucket to store the Parquet dataset.

  1. On the Amazon S3 console, choose Create bucket.
  2. For Bucket name, enter a name for your bucket.

  1. Choose Create.

Creating a new database in the Data Catalog

The Data Catalog is an Apache Hive-compatible managed metadata storage that lets you store, annotate, and share metadata on AWS.

For this use case, you use it to store the metadata associated with your Parquet dataset. The Data Catalog is integrated with many analytics services, including Athena, Amazon Redshift Spectrum, and Amazon EMR (Apache Spark, Apache Hive, and Presto).

  1. On the AWS Glue console, choose Databases.
  2. Choose Add database.
  3. For Database name, enter awswrangler_test.
  4. Choose Create.

Launching an Amazon SageMaker notebook

An Amazon SageMaker notebook is a managed instance running the Jupyter Notebook app. For this use case, you use it to write and run your code.

  1. On the Amazon SageMaker console, choose Notebook instance.
  2. Choose Create a notebook instance.
  3. For Notebook instance name, enter a name.
  4. For IAM role, choose an existing AWS Identity and Access Management (IAM) role or create a role that allows you to run Amazon SageMaker and grants access to Amazon S3, Athena, and AWS Glue for the related resources.

  1. Wait for the notebook status to show as InService.
  2. Choose Open Jupyter from the notebook instance you created.

Exploring the data

This section walks you through several notebook paragraphs to expose how to install and use AWS Data Wrangler.

  1. On Jupyter console, under New, choose conda_python3.
  2. To install AWS Data Wrangler, enter the following code:
    !pip install awswrangler

  3. To avoid dependency conflicts, restart the notebook kernel by choosing kernel -> Restart.
  4. Import the library given the usual alias wr:
    import awswrangler as wr

  5. List all files in the NOAA public bucket from the decade of 1880:
    wr.s3.list_objects("s3://noaa-ghcn-pds/csv/188")

The following screenshot shows the output.

  1. Load the whole decade (10 files) into a Pandas DataFrame using the Amazon S3 prefix s3://noaa-ghcn-pds/csv/188:
    col_names = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
    
    df = wr.s3.read_csv(
        path="s3://noaa-ghcn-pds/csv/188",
        names=col_names,
        parse_dates=["dt", "obs_time"]  # Hint to parse these columns as date instead of strings
    )

    The following screenshot shows the output.

  1. Create a new column extracting the year from the dt column (the new column is useful for creating partitions in the Parquet dataset):
    df["year"] = df["dt"].dt.year

The following screenshot shows the output.

  1. Store the Pandas DataFrame in the S3 bucket you created in the beginning of this post (replace the [BUCKET] placeholder in the code with your bucket name):
    wr.s3.to_parquet(
        df=df,
        path="s3://[BUCKET]/noaa/",
        dataset=True,
        database="awswrangler_test",
        table="noaa",
        partition_cols=["year"]
    );

The preceding code creates the table noaa in the awswrangler_test database in the Data Catalog.

  1. After processing this, you can confirm the Parquet files exist in Amazon S3 and the table noaa is in AWS Glue data catalog. See the following code:
    wr.s3.list_objects("s3://[BUCKET]/noaa/")
    wr.catalog.table(database="awswrangler_test", table="noaa")

The following screenshot shows the output.

  1. Run a SQL query from Athena that filters only the US maximum temperature measurements of the last 3 years (1887–1889) and receive the result as a Pandas DataFrame:
    sql = """
    SELECT
        dt,
        (value / 10.0) AS temperature  -- Converting tenths of degrees C to regular degrees C
    FROM noaa
    WHERE year BETWEEN 1887 AND 1889  -- Only last 3 years (PARTITION filter)
    AND substr(id, 1, 2)='US'  -- Only U.S. stations
    AND element='TMAX'  -- Only Maximum temperature elements
    AND q_flag is NULL  -- Only HIGH quality measurement
    """
    
    df = wr.athena.read_sql_query(sql, database="awswrangler_test")

The following screenshot shows the output.

The following two queries illustrate how you can visualize the data.

  1. To plot the average maximum temperature measured in the tracked station, enter the following code:
    %matplotlib inline
    df.groupby("dt").mean().plot();

The following screenshot shows the output.

  1. To plot a moving average of the previous metric with a 30-day window, enter the following code:
    %matplotlib inline
    df.groupby("dt").mean().rolling(window=30, center=True).mean().plot();

The following screenshot shows the output.

Cleaning up

To avoid incurring future charges, delete the resources from the following services:

  1. AWS Glue database
    • On the AWS Glue console, choose the database you created.
    • From the Actions drop-down menu, choose Delete database.
    • Choose Delete.
  2. Amazon SageMaker notebook
    • On the Amazon SageMaker console, choose the notebook instance you created.
    • From the Actions drop-down menu, choose Stop.
    • When the status shows as Stopped, choose Database.
    • Choose Delete.
  3. S3 bucket
    • On the Amazon S3 console, choose Buckets.
    • Choose the bucket you created.
    • Choose Empty and enter your bucket name.
    • Choose Confirm.
    • Choose Delete and enter your bucket name.
    • Choose Delete bucket.
  4. IAM Role
    • On the IAM console, choose Roles.
    • Choose the role you attached to Amazon SageMaker.
    • Choose Delete role.
    • Choose Yes.

Conclusion

Installing AWS Data Wrangler is a breeze. With a single command, you can connect ETL tasks to multiple data sources and different data services. The library is a work in progress, with new features and enhancements added regularly. For more tutorials, see the GitHub repo.

 


About the Authors

Satoshi Kuramitsu is a Solutions Architect in AWS. His favorite AWS services are AWS Glue, Amazon Kinesis, and Amazon S3.

 

 

 

 

 

Igor Tavares is a Data & Machine Learning Engineer in the AWS Professional Services team and the original creator of AWS Data Wrangler.

 

 

 

Automate Amazon Athena queries for PCI DSS log review using AWS Lambda

Post Syndicated from Logan Culotta original https://aws.amazon.com/blogs/security/automate-amazon-athena-queries-for-pci-dss-log-review-using-aws-lambda/

In this post, I will show you how to use AWS Lambda to automate PCI DSS (v3.2.1) evidence generation, and daily log review to assist with your ongoing PCI DSS activities. We will specifically be looking at AWS CloudTrail Logs stored centrally in Amazon Simple Storage Service (Amazon S3) (which is also a Well-Architected Security Pillar best practice) and use Amazon Athena to query.

This post assumes familiarity with creating a database in Athena. If you’re new to Athena, please take a look at the Athena getting started guide and create a database before continuing. Take note of the bucket chosen for the output of Athena query results, we will use it later in this post.

In this post, we walk through:

  • Creating a partitioned table for your AWS CloudTrail logs. In order to reduce costs and time to query results in Athena, we’ll show you how to partition your data. If you’re not already familiar with partitioning, you can learn about it in the Athena user guide.
  • Constructing SQL queries to search for PCI DSS audit log evidence. The SQL queries that are provided in this post are directly related to PCI DSS requirement 10. Customizing these queries to meet your responsibilities may be able to assist you in preparing for a PCI DSS assessment.
  • Creating an AWS Lambda function to automate running these SQL queries daily, in order to help address the PCI DSS daily log review requirement 10.6.1.

Create and partition a table

The following code will create and partition a table for CloudTrail logs. Before you execute this query, be sure to replace the variable placeholders with the information from your database. They are:

  • <YOUR_TABLE> – the name of your Athena table
  • LOCATION – the path to your CloudTrail logs in Amazon S3. An example is included in the following code. It includes the variable placeholders:
    • <AWS_ACCOUNT_NUMBER> – your AWS account number. If using organizational CloudTrail, use the following format throughout the post for this variable: o-<orgID>/<ACCOUNT_NUMBER>
    • <LOG_BUCKET> – the bucket name where the CloudTrail logs to be queried reside

CREATE EXTERNAL TABLE <YOUR_TABLE> (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                userName: STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING
)
COMMENT 'CloudTrail table'
PARTITIONED BY(region string, year string, month string, day string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');

Execute the query. You should see a message stating Query successful.

Figure 1: Query successful

Figure 1: Query successful

The preceding query creates a CloudTrail table and defines the partitions in your Athena database. Before you begin running queries to generate evidence, you will need to run alter table commands to finalize the partitioning.

Be sure to update the following variable placeholders with your information:

  • <YOUR_DATABASE> – the name of your Athena database
  • <YOUR_TABLE>
  • <LOG_BUCKET>
  • <AWS_ACCOUNT_NUMBER>

Provide values for the following variables:

  • region – region of the logs to partition
  • month – month of the logs to partition
  • day – day of the logs to partition
  • year – year of the logs to partition
  • LOCATION – the path to your CloudTrail logs in Amazon S3 to partition, down to the specific day (should match the preceding values of region, month, day, and year). It includes the variable placeholders:
    • <AWS_ACCOUNT_NUMBER>
    • <LOG_BUCKET>

ALTER TABLE <YOUR_DATABASE>.<YOUR_TABLE>  ADD partition  (region='us-east-1', month='02', day='28', year='2020') location 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/us-east-1/2020/02/28/';

After the partition has been configured, you can query logs from the date and region that was partitioned. Here’s an example for PCI DSS requirement 10.2.4 (all relevant PCI DSS requirements are described later in this post).


SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND region= 'us-east-1' AND year='2020' AND month='02' AND day='28';

Create a Lambda function to save time

As you can see, this process above can involve a lot of manual steps as you set up partitioning for each region and then query for each day or region. Let’s simplify the process by putting these into a Lambda function.

Use the Lambda console to create a function

To create the Lambda function:

  1. Open the Lambda console and choose Create function, and select the option to Author from scratch.
  2. Enter Athena_log_query as the function name, and select Python 3.8 as the runtime.
  3. Under Choose or create an execution role, select Create new role with basic Lambda permissions.
  4. Choose Create function.
  5. Once the function is created, select the Permissions tab at the top of the page and select the Execution role to view in the IAM console. It will look similar to the following figure.
     
    Figure 2: Permissions tab

    Figure 2: Permissions tab

Update the IAM Role to allow Lambda permissions to relevant services

  1. In the IAM console, select the policy name. Choose Edit policy, then select the JSON tab and paste the following code into the window, replacing the following variable and placeholders:
    • us-east-1 – This is the region where resources are. Change only if necessary.
    • <AWS_ACCOUNT_NUMBER>
    • <YOUR_DATABASE>
    • <YOUR_TABLE>
    • <LOG_BUCKET>
    • <OUTPUT_LOG_BUCKET> – bucket name you chose to store the query results when setting up Athena.
    
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateDatabase",
                    "glue:BatchCreatePartition",
                    "glue:GetDatabase",
                    "athena:StartQueryExecution",
                    "glue:GetPartitions",
                    "glue:UpdateTable",
                    "s3:CreateBucket",
                    "s3:ListBucket",
                    "glue:GetTable",
                    "s3:ListMultipartUploadParts",
                    "s3:PutObject",
                    "s3:GetObjectAcl",
                    "s3:GetObject",
                    "athena:CancelQueryExecution",
                    "athena:StopQueryExecution",
                    "athena:GetQueryExecution",
                    "s3:GetBucketLocation",
                    "glue:UpdatePartition"
                ],
                "Resource": [
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:catalog",
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:table/<YOUR_DATABASE>/<YOUR_TABLE>",
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:database/mydatabase",
                    "arn:aws:s3:::<LOG_BUCKET>/*",
                    "arn:aws:s3:::<LOG_BUCKET>",
    				"arn:aws:s3:::<OUTPUT_LOG_BUCKET>/*",
    				"arn:aws:s3:::<OUTPUT_LOG_BUCKET>",
                    "arn:aws:athena:us-east-1:<AWS_ACCOUNT_NUMBER>:workgroup/primary"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:PutLogEvents",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream"
                ],
                "Resource": "arn:aws:logs:us-east-1:<AWS_ACCOUNT_NUMBER>:*"
            }
        ]
    }
    

    Note: Depending on the environment, this policy might not be restrictive enough and should be limited to only users needing access to the cardholder data environment and audit logs. More information about restricting IAM policies can be found in IAM JSON Policy Elements: Condition Operators.

  2. Choose Review policy and then Save changes.

Customize the Lambda Function

  1. On the Lambda dashboard, choose the Configuration tab. In Basic settings, increase the function timeout to 5 minutes to ensure that the function always has time to finish running your queries, and then select Save. Best Practices for Developing on AWS Lambda has more tips for using Lambda.
  2. Paste the following code into the function editor on the Configuration tab, replacing the existing text. The code includes eight example queries to run and can be customized as needed.

    The first query will add partitions to your Amazon S3 logs so that the following seven queries will run quickly and be cost effective.

    This code combines the partitioning, and example Athena queries to assist in meeting PCI DSS logging requirements, which will be explained more below:

    Replace these values in the code that follows:

    • <YOUR_DATABASE>
    • <YOUR_TABLE>
    • <LOG_BUCKET>
    • <AWS_ACCOUNT_NUMBER>
    • <OUTPUT_LOG_BUCKET>
    • REGION1 – first region to partition
    • REGION2 – second region to partition*
    
    import boto3
    import datetime
    import time
    
    #EDIT THE FOLLOWING#
    #----------------------#
    
    #This should be the name of your Athena database
    ATHENA_DATABASE = "<YOUR_DATABASE>"
    
    #This should be the name of your Athena database table
    ATHENA_TABLE = "<YOUR_TABLE>"
    
    #This is the Amazon S3 bucket name you want partitioned and logs queried from:
    LOG_BUCKET = "<LOG_BUCKET>"
    
    #AWS Account number for the Amazon S3 path to your CloudTrail logs
    AWS_ACCOUNT_ID = "<AWS_ACCOUNT_NUMBER>"
    
    #This is the Amazon S3 bucket name for the Athena Query results:
    OUTPUT_LOG_BUCKET = "<OUTPUT_LOG_BUCKET>"
    
    #Define regions to partition
    REGION1 = "us-east-1"
    REGION2 = "us-west-2"
    #----------------------#
    #STOP EDITING#
    
    RETRY_COUNT = 50
    
    #Getting the current date and splitting into variables to use in queries below
    CURRENT_DATE = datetime.datetime.today()
    DATEFORMATTED = (CURRENT_DATE.isoformat())
    ATHENA_YEAR = str(DATEFORMATTED[:4])
    ATHENA_MONTH = str(DATEFORMATTED[5:7])
    ATHENA_DAY = str(DATEFORMATTED[8:10])
    
    #location for the Athena query results
    OUTPUT_LOCATION = "s3://"+OUTPUT_LOG_BUCKET+"/DailyAthenaLogs/CloudTrail/"+str(CURRENT_DATE.isoformat())
    
    #Athena Query definitions for PCI DSS requirements
    YEAR_MONTH_DAY = f'year=\'{ATHENA_YEAR}\' AND month=\'{ATHENA_MONTH}\' AND day=\'{ATHENA_DAY}\';'
    ATHENA_DB_TABLE = f'{ATHENA_DATABASE}.{ATHENA_TABLE}'
    PARTITION_STATEMENT_1 = f'partition (region="{REGION1}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
    LOCATION_1 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION1}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
    PARTITION_STATEMENT_2 = f'partition (region="{REGION2}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
    LOCATION_2 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION2}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
    SELECT_STATEMENT = "SELECT * FROM "+ATHENA_DB_TABLE+ " WHERE "
    LIKE_BUCKET = f' \'%{LOG_BUCKET}%\''
    
    
    #Query to partition selected regions
    QUERY_1 = f'ALTER TABLE {ATHENA_DB_TABLE} ADD IF NOT EXISTS {PARTITION_STATEMENT_1} {LOCATION_1} {PARTITION_STATEMENT_2} {LOCATION_2}'
    
    #Access to audit trails or CHD 10.2.1/10.2.3
    QUERY_2 = f'{SELECT_STATEMENT} requestparameters LIKE {LIKE_BUCKET} AND sourceipaddress <> \'cloudtrail.amazonaws.com\' AND sourceipaddress <> \'athena.amazonaws.com\' AND eventName = \'GetObject\' AND {YEAR_MONTH_DAY}'
    
    #Root Actions PCI DSS 10.2.2
    QUERY_3 = f'{SELECT_STATEMENT} userIdentity.sessionContext.sessionIssuer.userName LIKE \'%root%\' AND {YEAR_MONTH_DAY}'
    
    #Failed Logons PCI DSS 10.2.4
    QUERY_4 = f'{SELECT_STATEMENT} eventname = \'ConsoleLogin\' AND responseelements LIKE \'%Failure%\' AND {YEAR_MONTH_DAY}'
    
    #Privilege changes PCI DSS 10.2.5.b, 10.2.5.c
    QUERY_5 = f'{SELECT_STATEMENT} eventname LIKE \'%AddUserToGroup%\' AND requestparameters LIKE \'%Admin%\' AND {YEAR_MONTH_DAY}'
    
    # Initialization, stopping, or pausing of the audit logs PCI DSS 10.2.6
    QUERY_6 = f'{SELECT_STATEMENT} eventname = \'StopLogging\' OR eventname = \'StartLogging\' AND {YEAR_MONTH_DAY}'
    
    #Suspicious activity PCI DSS 10.6
    QUERY_7 = f'{SELECT_STATEMENT} eventname LIKE \'%DeleteSecurityGroup%\' OR eventname LIKE \'%CreateSecurityGroup%\' OR eventname LIKE \'%UpdateSecurityGroup%\' OR eventname LIKE \'%AuthorizeSecurityGroup%\' AND {YEAR_MONTH_DAY}'
    
    QUERY_8 = f'{SELECT_STATEMENT} eventname LIKE \'%Subnet%\' and eventname NOT LIKE \'Describe%\' AND {YEAR_MONTH_DAY}'
    
    #Defining function to generate query status for each query
    def query_stat_fun(query, response):
        client = boto3.client('athena')
        query_execution_id = response['QueryExecutionId']
        print(query_execution_id +' : '+query)
        for i in range(1, 1 + RETRY_COUNT):
            query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
            query_fail_status = query_status['QueryExecution']['Status']
            query_execution_status = query_fail_status['State']
    
            if query_execution_status == 'SUCCEEDED':
                print("STATUS:" + query_execution_status)
                break
    
            if query_execution_status == 'FAILED':
                print(query_fail_status)
    
            else:
                print("STATUS:" + query_execution_status)
                time.sleep(i)
        else:
            client.stop_query_execution(QueryExecutionId=query_execution_id)
            raise Exception('Maximum Retries Exceeded')
    
    def lambda_handler(query, context):
        client = boto3.client('athena')
        queries = [QUERY_1, QUERY_2, QUERY_3, QUERY_4, QUERY_5, QUERY_6, QUERY_7, QUERY_8]
        for query in queries:
            response = client.start_query_execution(
                QueryString=query,
                QueryExecutionContext={
                    'Database': ATHENA_DATABASE },
                ResultConfiguration={
                    'OutputLocation': OUTPUT_LOCATION })
            query_stat_fun(query, response)
    

    Note: More regions can be added if you have additional regions to partition. The ADD partition statement can be copied and pasted to add additional regions as needed. Additionally, you can hard code the regions for your environment into the statements.

  3. Choose Save in the top right.

Athena Queries used to collect evidence

The queries used to gather evidence for PCI DSS are broken down from the Lambda function we created, using the partitioned date example from above. They are listed with their respective requirement.

Note: AWS owns the security OF the cloud, providing high levels of security in alignment with our numerous compliance programs. The customer is responsible for the security of their resources IN the cloud, keeping its content secure and compliant. The queries below are meant to be a proof of concept and should be tailored to your environment.

10.2.1/10.2.3 – Implement automated audit trails for all system components to reconstruct access to either or both cardholder data and audit trails:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE requestparameters LIKE '%<LOG_BUCKET>%' AND sourceipaddress <> 'cloudtrail.amazonaws.com' AND sourceipaddress <>  'athena.amazonaws.com' AND eventName = 'GetObject' AND year='2020' AND month='02' AND day='28';"

10.2.2 – Implement automated audit trails for all system components to reconstruct all actions taken by anyone using root or administrative privileges.


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE userIdentity.sessionContext.sessionIssuer.userName LIKE '%root%' AND year='2020' AND month='02' AND day='28';"

10.2.4 – Implement automated audit trails for all system components to reconstruct invalid logical access attempts.


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND year='2020' AND month='02' AND day='28';"

10.2.5.b – Verify all elevation of privileges is logged.

10.2.5.c – Verify all changes, additions, or deletions to any account with root or administrative privileges are logged:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%AddUserToGroup%' AND requestparameters LIKE '%Admin%' AND year='2020' AND month='02' AND day='28';"

10.2.6 – Implement automated audit trails for all system components to reconstruct the initialization, stopping, or pausing of the audit logs:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'StopLogging' OR eventname = 'StartLogging' AND year='2020' AND month='02' AND day='28';"

10.6 – Review logs and security events for all system components to identify anomalies or suspicious activity:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%DeleteSecurityGroup%' OR eventname LIKE '%CreateSecurityGroup%' OR eventname LIKE '%UpdateSecurityGroup%' OR eventname LIKE '%AuthorizeSecurityGroup%' AND year='2020' AND month='02' AND day='28';" 

"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%Subnet%' and eventname NOT LIKE 'Describe%' AND year='2020' AND month='02' AND day='28';" 

You can use the AWS Command Line Interface (AWS CLI) to invoke the Lambda function using the following command, replacing <YOUR_FUNCTION> with the name of the Lambda function you created:


aws lambda invoke --function-name <YOUR_FUNCTION> outfile

The AWS Lambda API Reference has more information on using Lambda with AWS CLI.

Note: the results from the function will be located in the OUTPUT_LOCATION variable within the Lambda function.

Use Amazon CloudWatch to run your Lambda function

You can create a rule in CloudWatch to have this function run automatically on a set schedule.

Create a CloudWatch rule

  1. From the CloudWatch dashboard, under Events, select Rules, then Create rule.
  2. Under Event Source, select the radio button for Schedule and choose a fixed rate or enter in a custom cron expression.
  3. Finally, in the Targets section, choose Lambda function and find your Lambda function from the drop down.

    The example screenshot shows a CloudWatch rule configured to invoke the Lambda function daily:
     

    Figure 3: CloudWatch rule

    Figure 3: CloudWatch rule

  4. Once the schedule is configured, choose Configure details to move to the next screen.
  5. Enter a name for your rule, make sure that State is enabled, and choose Create rule.

Check that your function is running

You can then navigate to your Lambda function’s CloudWatch log group to see if the function is running as intended.

To locate the appropriate CloudWatch group, from your Lambda function within the console, select the Monitoring tab, then choose View logs in CloudWatch.
 

Figure 4: View logs in CloudWatch

Figure 4: View logs in CloudWatch

You can take this a step further and set up an SNS notification to email you when the function is triggered.

Summary

In this post, we walked through partitioning an Athena table, which assists in reducing time and cost when running queries on your S3 buckets. We then constructed example SQL queries related to PCI DSS requirement 10, to assist in audit preparation. Finally, we created a Lambda function to automate running daily queries to pull PCI DSS audit log evidence from Amazon S3, to assist with the PCI DSS daily log review requirement. I encourage you to customize, add, or remove the SQL queries to best fit your needs and compliance requirements.

If you have feedback about this post, submit comments in the Comments section below.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Logan Culotta

Logan Culotta is a Security Assurance Consultant, and a current Qualified Security Assessor (QSA). Logan is part of the AWS Security Assurance team, which is also a Qualified Security Assessor Company (QSAC). He enjoys finding ways to automate compliance and security in the AWS cloud. In his free time, you can find him spending time with his family, road cycling, or cooking.