Tag Archives: Amazon Athena

Effective data lakes using AWS Lake Formation, Part 1: Getting started with governed tables

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-1-effective-data-lakes-using-aws-lake-formation-part-1-getting-started-with-governed-tables/

Thousands of customers are building their data lakes on Amazon Simple Storage Service (Amazon S3). You can use AWS Lake Formation to build your data lakes easily—in a matter of days as opposed to months. However, there are still some difficult challenges to address with your data lakes:

  • Supporting streaming updates and deletes in your data lakes, for example, database replication, and supporting privacy regulations such as GDPR and CCPA
  • Achieving fine-grained secure sharing not only with table- or column-level access control, but with row-level access control
  • Optimizing the layout of various tables and files on Amazon S3 to improve analytics performance

We announced Lake Formation transactions, row-level security, and acceleration for preview at AWS re:Invent 2020. These capabilities are available via new, open, and public update and access APIs for data lakes. These APIs extend the governance capabilities of Lake Formation with row-level security, and provide transactions semantics on data lakes.

In this series of the posts, we provide a step-by-step instruction to use these new Lake Formation features. In this post, we focus on the first step of setting up governed tables.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, sign up for the preview. You need to be approved for the preview to gain access to these features.

Governed Table

The Data Catalog supports a new type of metadata tables: governed tables. Governed tables are unique to Lake Formation. Governed tables are a new Amazon S3 table type that supports atomic, consistent, isolated, and durable (ACID) transactions. Lake Formation transactions simplify ETL script and workflow development, and allow multiple users to concurrently and reliably insert, delete, and modify rows across multiple governed tables. Lake Formation automatically compacts and optimizes storage of governed tables in the background to improve query performance. When you create a table, you can specify whether or not the table is governed.

Setting up resources with AWS CloudFormation

In this post, I demonstrate how you can create a new governed table using existing data on Amazon S3. We use the Amazon Customer Reviews Dataset, which is stored in a public S3 bucket as sample data. You don’t need to copy the data to your bucket or worry about Amazon S3 storage costs. You can just set up a governed table pointing to this existing public data to see how it works.

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. If you prefer setting up resources on the AWS Management Console rather than AWS CloudFormation, see the instructions in the appendix at the end of this post.

The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the CloudFormation console in us-east-1 Region.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatalakeAdminUserNameand DatalakeAdminUserPassword, enter your IAM user name and password for data lake admin user.
  5. For DataAnalystUserNameand DataAnalystUserPassword, enter your IAM user name and password for data analyst user.
  6. For DatabaseName, leave as the default.
  7. Choose Next.
  8. On the next page, choose Next.
  9. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  10. Choose Create.

Stack creation can take up to 2 minutes.

Setting up a governed table

Now you can create and configure your first governed table in AWS Lake Formation.

Creating a governed table

To create your governed table, complete the following steps:

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin1 user.
  2. Choose Tables.
  3. Choose Create table.
  4. For Name, enter amazon_reviews_governed.
  5. For Database, enter lakeformation_tutorial_amazon_reviews.
  6. Select Enable governed data access and management.
  7. Select Enable row based permissions.

Select Enable row based permissions.

    1. For Data is located in, choose Specified path in another account.
    2. Enter the path s3://amazon-reviews-pds/parquet/.
    3. For Classification, choose PARQUET.
    4. Choose Upload Schema.
    5. Enter the following JSON array into the text box:
[
    {
        "Name": "marketplace",
        "Type": "string"
    },
    {
        "Name": "customer_id",
        "Type": "string"
    },
    {
        "Name": "review_id",
        "Type": "string"
    },
    {
        "Name": "product_id",
        "Type": "string"
    },
    {
        "Name": "product_parent",
        "Type": "string"
    },
    {
        "Name": "product_title",
        "Type": "string"
    },
    {
        "Name": "star_rating",
        "Type": "int"
    },
    {
        "Name": "helpful_votes",
        "Type": "int"
    },
    {
        "Name": "total_votes",
        "Type": "int"
    },
    {
        "Name": "vine",
        "Type": "string"
    },
    {
        "Name": "verified_purchase",
        "Type": "string"
    },
    {
        "Name": "review_headline",
        "Type": "string"
    },
    {
        "Name": "review_body",
        "Type": "string"
    },
    {
        "Name": "review_date",
        "Type": "bigint"
    },
    {
        "Name": "year",
        "Type": "int"
    }
]
  1. Choose Upload.
  2. Choose Add column.
  3. For Column name, enter product_category.
  4. For Data type, choose String.
  5. Select Partition Key.
  6. Choose Add.
  7. Choose Submit.

Now you can see that the new governed table has been created.

When you choose the table name, you can see the details of the governed table, and you can also see Governance: Enabled in this view. It means that it’s a Lake Formation governed table. If you have other existing tables, it should show as Governance: Disabled because the tables are not governed tables.
Now you can see that the new governed table has been created.

You can also see lakeformation.aso.status: true under Table properties. It means that automatic compaction is enabled for this table. For this post, we use a read-only table and don’t utilize automatic compaction. To disable the automatic compaction, complete the following steps:

  1. Choose Edit table.
  2. Deselect Automatic compaction.
  3. Choose Save.

Currently, no data and no partitions are registered to this governed table. In the next step, we register existing S3 objects to the governed table using Lake Formation manifest APIs.

Even if you locate your data in the table location of the governed table, the data isn’t recognized yet. To make the governed table aware of the data, you need to make a Lake Formation API call, or use an AWS Glue job with Lake Formation transactions.

Even if you locate your data in the table location of the governed table, the data isn’t recognized yet.

Configuring Lake Formation permissions

You need to grant Lake Formation permissions for your governed table. Complete the following steps:

Table-level permissions

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin1 user.
  2. Under Permissions, choose Data permissions.
  3. Under Data permission, choose Grant.
  4. For Database, choose lakeformation_tutorial_amazon_reviews.
  5. For Table, choose amazon_reviews_governed.
  6. For IAM users and roles, choose the role LFRegisterLocationServiceRole-<CloudFormation stack name> and the user DatalakeAdmin1.
  7. Select Table permissions.
  8. Under Table permissions, select Alter, Insert, Drop, Delete, Select, and Describe.
  9. Choose Grant.
  10. Under Data permission, choose Grant.
  11. For Database, choose lakeformation_tutorial_amazon_reviews.
  12. For Table, choose amazon_reviews_governed.
  13. For IAM users and roles, choose the user DataAnalyst1.
  14. Under Table permissions, select Select and Describe.
  15. Choose Grant.

Row-level permissions

  1. Under Permissions, choose Data permissions.
  2. Under Data permission, choose Grant.
  3. For Database, choose lakeformation_tutorial_amazon_reviews.
  4. For Table, choose amazon_reviews_governed.
  5. For IAM users and roles, choose the role LFRegisterLocationServiceRole-<CloudFormation stack name>, the users DatalakeAdmin1 and DataAnalyst.
  6. Select Row-based permissions.
  7. For Filter name, enter allowAll.
  8. For Choose filter type, select Allow access to all rows.
  9. Choose Grant.

Adding table objects into the governed table

To register S3 objects to a governed table, you need to call the UpdateTableObjects API needs for the objects. You can call it using the AWS Command Line Interface (AWS CLI) and SDK, and also the AWS Glue ETL library (the API is called implicitly in the library). For this post, we use the AWS CLI to explain the behavior in the API level. If you don’t have the AWS CLI, see Installing, updating, and uninstalling the AWS CLI. You also need to install the service model file provided in the Lake Formation preview program. You need to run the following commands using DatalakeAdmin1 user’s credential (or an IAM role or user where sufficient permissions are granted).

First, begin a new transaction with the BeginTransaction API:

$ aws lakeformation-preview begin-transaction
{
    "TransactionId": "7e5d506a757f32252ae3402a10191b13bfd1d7aa1c26a099d4a1911241589b8f"
}

Now you can register any files on the location. For this post, we choose one sample partition product_category=Camera from the amazon-reviews-pds table, and choose one file under this partition. Uri, ETag, and Size are the required information for further steps, so you need to copy them.

$ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Camera/
2018-04-09 15:37:05   65386769 part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:06   65619234 part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:06   64564669 part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65148225 part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65227429 part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65269357 part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:08   65595867 part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:08   65012056 part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:09   65137504 part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:09   64992488 part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

$ aws s3api head-object --bucket amazon-reviews-pds --key parquet/product_category=Camera/part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
{
    "AcceptRanges": "bytes",
    "LastModified": "Mon, 09 Apr 2018 06:37:07 GMT",
    "ContentLength": 65227429,
    "ETag": "\"980669fcf6ccf31d2d686b9cccdd45e3-8\"",
    "ContentType": "binary/octet-stream",
    "Metadata": {}
}

Create a new file named write-operations1.json and enter the following JSON: (replace Uri, ETag, and Size with the values you copied.)

[
    {
        "AddObject": {
            "Uri": "s3://amazon-reviews-pds/parquet/product_category=Camera/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
            "ETag": "d4c25c40f33071620fb31cf0346ed2ec-8",
            "Size": 65386769,
            "PartitionValues": [
                "Camera"
            ]
        }
    }
]

Let’s register an existing object on the bucket to the governed table by making an UpdateTableObjects API call using write-operations1.json you created. (replace <transaction-id> with the transaction id you got in begin-transaction command.)

$ aws lakeformation-preview update-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id> --write-operations file://./write-operations1.json$ 
Note current date time right after making the UpdateTableObjects API call here. We use this timestamp for time travel queries later.
$ date -u
Tue Feb  2 12:12:00 UTC 2021

You can ensure the change before the transaction commit by making the GetTableObjects API call with the same transaction ID: (Replace <transaction-id> with the id you got in begin-transaction command.)

$ aws lakeformation-preview get-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id>
{
    "Objects": [
        {
            "PartitionValues": [
                "Camera"
            ],
            "Objects": [
                {
                    "Uri": "s3://amazon-reviews-pds/parquet/product_category=Camera/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
                    "ETag": "d4c25c40f33071620fb31cf0346ed2ec-8",
                    "Size": 65386769
                }
            ]
        }
    ]
}

To make this data available for other transactions, you need to call the CommitTransaction API: (replace <transaction-id> with the transaction id you got in begin-transaction command.)

$ aws lakeformation-preview commit-transaction --transaction-id <transaction-id>
After running the preceding command, you can see the partition on the Lake Formation console.

After running the preceding command, you can see the partition on the Lake Formation console.

Let’s add one more partition into this table. This time we add one file per partition, and add only two partitions as an example. For actual usage, you need to add all the files under all the partitions that you need.

Add partitions with following commands:

  1. Call the BeginTransaction API to start another Lake Formation transaction:
    $ aws lakeformation-preview begin-transaction
    {
         "TransactionId": "d70c60e859e832b312668723cf48c1b84ef9109c5dbf6e9dbe8834c481c0ec81"
    }

  2. List Amazon S3 objects located on amazon-reviews-pds bucket to choose another sample file:
    $ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Books/
    2018-04-09 15:35:58 1094842361 part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:35:59 1093295804 part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1095643518 part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1095218865 part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1094787237 part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:33 1094302491 part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1094565655 part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1095288096 part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1092058864 part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1093613569 part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

  3. Call the HeadObject API against one sample file in order to copy ETag and Size
    $ aws s3api head-object --bucket amazon-reviews-pds --key parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    {
         "AcceptRanges": "bytes",
         "LastModified": "Mon, 09 Apr 2018 06:35:58 GMT",
         "ContentLength": 1094842361,
         "ETag": "\"9805c2c9a0459ccf337e01dc727f8efc-131\"",
         "ContentType": "binary/octet-stream",
         "Metadata": {}
    }

  4. Create a new file named write-operations2.json and enter the following JSON: (Replace Uri, ETag, and Size with the values you copied.)
    [
        {
                "AddObject": {
                "Uri": "s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
                "ETag": "9805c2c9a0459ccf337e01dc727f8efc-131",
                "Size": 1094842361,
                "PartitionValues": [
                    "Books"
               ]
           }
        }
    ]

  5. Call the UpdateTableObjects API using write-operations2.json: (replace <transaction-id> with the transaction id you got in begin-transaction command.)
    $ aws lakeformation-preview update-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id> --write-operations file://./write-operations2.json

    Call the CommitTransaction API: (replace <transaction-id> with the transaction id you got in begin-transaction command.)

    $ aws lakeformation-preview commit-transaction --transaction-id <transaction-id>

    Now the two partitions are visible on the Lake Formation console.

Now the two partitions are visible on the Lake Formation console.

Querying the governed table using Amazon Athena

Now your governed table is ready! Let’s start querying the governed table using Amazon Athena. Sign in to the Athena console in us-east-1 Region using DataAnalyst1 user.

If it’s your first time running queries on Athena, you need to configure a query result location. For more information, see Specifying a Query Result Location.

To utilize Lake Formation preview features, you need to create a special workgroup named AmazonAthenaLakeFormationPreview, and join the workgroup. For more information, see Managing Workgroups.

Running a simple query

Sign in to the Athena console in us-east-1 Region using the DataAnalyst1 user. First, let’s preview 10 records stored in a governed table:

SELECT * 
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed
LIMIT 10

The following screenshot shows the query results.

The following screenshot shows the query results.

Running an analytic query

Next, let’s run an analytic query with aggregation for simulating real-world use cases:

SELECT product_category, count(*) as TotalReviews, avg(star_rating) as AverageRating
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed 
GROUP BY product_category

The following screenshot shows the results. This query returned the total number of reviews and average rating per product category.

The following screenshot shows the results

Running an analytic query with time travel

Each governed table maintains a versioned manifest of the Amazon S3 objects that it comprises. You can use previous versions of the manifest for time travel queries. Your queries against governed tables in Athena can include a timestamp to indicate that you want to discover the state of the data at a particular date and time.

To submit a time travel query in Athena, add a WHERE clause that sets the column __asOfDate to the epoch time (long integer) representation of the required date and time. Let’s run the time travel query: (replace <epoch-milliseconds> with the timestamp which is right after you made the first UpdateTableObjects call. To retrieve the epoch milliseconds, see the tips introduced after the screenshots in this post.)

SELECT product_category, count(*) as TotalReviews, avg(star_rating) as AverageRating
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed
WHERE __asOfDate = <epoch-milliseconds>
GROUP BY product_category

The following screenshot shows the query results. The result only includes the record of product_category=Camera. This is because that the file under product_category=Books has been added after this timestamp (1612267920000 ms = 2021/02/02 12:12:00 UTC), which has been specified in the time travel column __asOfDate.

The following screenshot shows the query results.

To retrieve epoch time from commands, you can run below commands.

The following command is for Linux (GNU date command):

$ echo $(($(date -u -d '2021/02/02 12:12:00' +%s%N)/1000000)) 
1612267920000

The following command is for OSX (BSD date command):

$ echo $(($(date -u -j -f "%Y/%m/%d %T" "2021/02/02 12:12:00" +'%s * 1000 + %-N / 1000000')))
1612267920000

Cleaning up

Now to the final step, cleaning up the resources.

  1. Delete the CloudFormation stack. The governed table you created is automatically deleted with the stack.
  2. Delete the Athena workgroup AmazonAthenaLakeFormationPreview.

Conclusion

In this blog post, we explained how to create a Lake Formation governed table with existing data in an AWS public dataset. In addition, we explained how to query against governed tables and how to run time travel queries for governed tables. With Lake Formation governed tables, you can achieve transactions, row-level security, and query acceleration. In Part 2 of this series, we show you how to create a governed table for streaming data sources and demonstrate how Lake Formation transactions work.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, please sign up for the preview.


Appendix: Setting up resources via the console

When following the steps in this section, use the Region us-east-1 because as of this writing, this Lake Formation preview feature is available only in us-east-1.

Configuring IAM roles and IAM users

First, you need to set up two IAM roles, one is for AWS Glue ETL jobs, another is for the Lake Formation data lake location.

IAM policies

To create your policies, complete the following steps:

  1. On the IAM console, create a new Policy for Amazon S3.
  2. Save the policy as S3DataLakePolicy as follows:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::amazon-reviews-pds/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::amazon-reviews-pds"
                ]
            }
        ]
    }

  3. Create a new IAM policy named LFLocationPolicy with the following statements:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFPreview1",
                "Effect": "Allow",
                "Action": "execute-api:Invoke",
                "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus"
            },
            {
                "Sid": "LFPreview2",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:BeginTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:AbortTransaction",
                    "lakeformation:GetTableObjects",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            }
        ]
    }

    
    

  4. Create a new IAM policy named LFQuery Policy with the following statements:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFPreview1",
                "Effect": "Allow",
                "Action": "execute-api:Invoke",
                "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus"
            },
            {
                "Sid": "LFPreview2",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:BeginTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:AbortTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:PlanQuery",
                    "lakeformation:GetTableObjects",
                    "lakeformation:GetQueryState",
                    "lakeformation:GetWorkUnits",
                    "lakeformation:Execute"
                ],
                "Resource": "*"
            }
        ]
    }

    IAM role for AWS Lake Formation

To create your IAM role for the Lake Formation data lake location, complete the following steps:

  1. Create a new Lake Formation role called LFRegisterLocationServiceRole with a Lake Formation trust relationship:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    

    Attach the customer managed policies S3DataLakePolicy and LFLocationPolicy you created in the previous step.

This role is used to register locations with Lake Formation which in-turn performs credential vending for Athena at query time.

IAM users

To create your users, complete the following steps:

  1. Create an IAM user named DatalakeAdmin.
  2. Attach the following AWS managed policies:
    1. AWSLakeFormationDataAdmin
    2. AmazonAthenaFullAccess
    3. IAMReadOnlyAccess
  3. Attach the customer managed policy LFQueryPolicy.
  4. Create an IAM user named DataAnalyst that can use Athena to query data.
  5. Attach the AWS managed policy AmazonAthenaFullAccess.
  6. Attach the customer managed policy LFQueryPolicy.

Configuring Lake Formation

If you’re new to Lake Formation, you can follow below steps for getting started with AWS Lake Formation.

  1. On the Lake Formation console, under Permissions, choose Admins and database creators.
  2. In the Data lake administratorssection, choose Grant.
  3. For IAM users and roles, choose your IAM user DatalakeAdmin.
  4. Choose Save.
  5. In the Database creators section, choose Grant.
  6. For IAM users and roles, choose the LFRegisterLocationServiceRole.
  7. Select Create Database.
  8. Choose Grant.
  9. Under Register and ingest, choose Data lake locations.
  10. Choose Register location.
  11. For Amazon S3 path, enter your Amazon S3 path to the bucket where your data is stored. This needs to be the same bucket you listed in LFLocationPolicy. Lake Formation uses this role to vend temporary Amazon S3 credentials to query services that need read/write access to the bucket and all prefixes under it.
  12. For IAM role, choose the LFRegisterLocationServiceRole.
  13. Choose Register location.
  14. Under Data catalog, choose Settings.
  15. Make sure that both check boxes for Use only IAM access control for new databases and Use only IAM access control for new tables in new databases are deselected.
  16. Under Data catalog, choose Databases.
  17. Choose Create database.
  18. Select Database.
  19. For Name, enter lakeformation_tutorial_amazon_reviews.
  20. Choose Create database.

About the Author

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue & Lake Formation. His passion is for implementing software artifacts for building data lakes more effectively and easily. During his spare time, he loves to spend time with his family, especially hunting bugs—not software bugs, but bugs like butterflies, pill bugs, snails, and grasshoppers.

Analyzing Freshdesk data using Amazon EventBridge and Amazon Athena

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/analyzing-freshdesk-data-using-amazon-eventbridge-and-amazon-athena/

This post is written by Shashi Shankar, Application Architect, Shared Delivery Teams

Freshdesk is an omnichannel customer service platform by Freshworks. It provides automation services to help speed up customer support processes.

The Freshworks connector to Amazon EventBridge allows real time streaming of Freshdesk events with minimal configuration and setup. This integration provides real-time insights into customer support operations without the operational overhead of provisioning and maintaining any servers.

In this blog post, I walk through a serverless approach to ingest and analyze Freshdesk data. This solution uses EventBridge, Amazon Kinesis Data Firehose, Amazon S3, and Amazon Athena. I also look at examples of customer service questions that can be answered using this approach.

The following diagram shows a high-level architecture of the proposed solution:

  1. When a Freshdesk ticket is updated or created, the Freshworks connector pushes event data to the Amazon EventBridge partner event bus.
  2. A rule on the partner event bus pushes the event data to Kinesis Data Firehose.
  3. Kinesis Data Firehose batches data before sending to S3. An AWS Lambda function transforms the data by adding a new line to each record before sending.
  4. Kinesis Data Firehose delivers the batch of records to S3.
  5. Athena is used to query relevant data from S3 using standard SQL.

The walkthrough shows you how to:

  1. Add the EventBridge app to Freshdesk account.
  2. Configure a Freshworks partner event bus in EventBridge.
  3. Deploy a Kinesis Data Firehose stream, a Lambda function, and an S3 bucket.
  4. Set up a custom rule on the event bus to push data to Kinesis Data Firehose.
  5. Generate sample Freshdesk data to validate the ingestion process.
  6. Set up a table in Athena to query the S3 bucket.
  7. Query and analyze data

Pre-requisites

  • A Freshdesk account (which can be created here).
  • An AWS account.
  • AWS Serverless Application Model (AWS SAM CLI), installed and configured.

Adding the Amazon EventBridge app to a Freshdesk account

  1. Log in to your Freshdesk account and navigate to Admin Helpdesk Productivity Apps. Search for EventBridge:
  2. Choose the Amazon EventBridge icon and choose Install.
  • Enter your AWS account number in the AWS Account ID field.
  • Enter “OnTicketCreate”, “OnTicketUpdate” in the Events field.
  • Enter the AWS Region to send the Freshdesk events in the Region field. This walkthrough uses the us-east-1 Region.

Configuring a Freshworks partner event bus in EventBridge

Once previous step is completed, a partner event source is automatically created in the EventBridge console. Copy the partner event source name to a clipboard.

  1. Clone the GitHub repo and deploy the AWS SAM template:
    git clone https://github.com/aws-samples/amazon-eventbridge-freshdesk-example.git
    cd ./amazon-eventbridge-freshdesk-example
    sam deploy --guided
  2. PartnerEventSource – Enter partner event source name copied from the previous step.
  3. S3BucketName – Enter an S3 bucket name to store Freshdesk ticket event data.

The AWS SAM template creates an association between the partner event source and event bus:

    Type: AWS::Events::EventBus
    Properties:
      EventSourceName: !Ref PartnerEventSource
      Name: !Ref PartnerEventSource

The template creates a Kinesis Data Firehose delivery stream, Lambda function, and S3 bucket to process and store the events from Freshdesk tickets. It also adds a rule to the custom event bus with the Kinesis Data Firehose stream as the target:

  PushToFirehoseRule:
    Type: "AWS::Events::Rule"
    Properties:
      Description: Test Freshdesk Events Rule
      EventBusName: !Ref PartnerEventSource
      EventPattern:
        account: [!Ref AWS::AccountId]
      Name: freshdeskeventrule
      State: ENABLED
      Targets:
        - Arn:
            Fn::GetAtt:
              - "FirehoseDeliveryStream"
              - "Arn"
          Id: "idfreshdeskeventrule"
          RoleArn: !GetAtt EventRuleTargetIamRole.Arn

  EventRuleTargetIamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Sid: ""
            Effect: "Allow"
            Principal:
              Service:
                - "events.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: Invoke_Firehose
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "firehose:PutRecord"
                  - "firehose:PutRecordBatch"
                Resource:
                  - !GetAtt FirehoseDeliveryStream.Arn

Generating sample Freshdesk data to validate the ingestion process:

To generate sample Freshdesk data, login to the Freshdesk account and browse to the “Tickets” screen as shown:

Follow the steps to simulate two customer service operations:

  1. To create a ticket of type “Refund”. Choose the New button and enter the details:
  2. Update an existing ticket and change the priority to “Urgent”.
  3. Within a few minutes of updating the ticket, the data is pushed via the Freshworks connector to the S3 bucket created using the AWS SAM template. To verify this, browse to the S3 bucket and see that a new object with the ticket data is created:

You can also use the S3 Select option under object actions to view the raw JSON data that is sent from the partner system. You are now ready to analyze the data using Athena.

Setting up a table in Athena to query the S3 bucket

If you are familiar with Apache Hive, you may find creating tables on Athena helpful. You can create tables by writing the DDL statement in the query editor or by using the wizard or JDBC driver. To create a table in Athena:

  1. Copy and paste the following DDL statement in the Athena query editor to create a Freshdesk’s events table. For this example, the table is created in the default database.
  2. Replace S3_Bucket_Name in the following query with the name of the S3 bucket created by deploying the previous AWS SAM template:
CREATE EXTERNAL TABLE ` freshdeskevents`(
  `id` string COMMENT 'from deserializer', 
  `detail-type` string COMMENT 'from deserializer', 
  `source` string COMMENT 'from deserializer', 
  `account` string COMMENT 'from deserializer', 
  `time` string COMMENT 'from deserializer', 
  `region` string COMMENT 'from deserializer', 
  `detail` struct<ticket:struct<subject:string,description:string,is_description_truncated:boolean,description_text:string,is_description_text_truncated:boolean,due_by:string,fr_due_by:string,fr_escalated:boolean,is_escalated:boolean,fwd_emails:array<string>,reply_cc_emails:array<string>,email_config_id:string,id:int,group_id:bigint,product_id:string,company_id:string,requester_id:bigint,responder_id:bigint,status:int,priority:int,type:string,tags:array<string>,spam:boolean,source:int,tweet_id:string,cc_emails:array<string>,to_emails:string,created_at:string,updated_at:string,attachments:array<string>,custom_fields:string,changes:struct<responder_id:array<bigint>,ticket_type:array<string>,status:array<int>,status_details:array<struct<id:int,name:string>>,group_id:array<bigint>>>,requester:struct<id:bigint,name:string,email:string,mobile:string,phone:string,language:string,created_at:string>> COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='account,detail,detail-type,id,region,resources,source,time,version') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION  's3://S3_Bucket_Name/'

The table is created on the data stored in S3 and is ready to be queried. Note that table freshdeskevents points at the bucket s3://S3_Bucket_Name/. As more data is added to the bucket, the table automatically grows, providing a near-real-time data analysis experience.

Querying and analyzing data

You can use the following examples to get started with querying the Athena table.

  1. To get all the events data, run:
SELECT * FROM default.freshdeskevents  limit 10

The preceding output has a detail column containing the details related to the ticket. Tickets can be filtered on nested notations to build more insightful queries. Also, the detail-type column provides classification of tickets as new (onTicketCreate) vs updated (onTicketUpdate).

  1. To show new tickets created today with the type “Refund”:
SELECT detail.ticket.subject,detail.ticket.description_text, detail.ticket.type  FROM default.freshdeskevents
where detail.ticket.type = 'Refund' and "detail-type" = 'onTicketCreate' and date(from_iso8601_timestamp(time)) = date(current_date)
  1. All tickets with an “Urgent” priority but not assigned to an agent:
SELECT "detail-type", detail.ticket.responder_id,detail.ticket.priority, detail.ticket.subject, detail.ticket.type  FROM default.freshdeskevents
where detail.ticket.responder_id is null and detail.ticket.priority = 4

Conclusion

In this blog post, you learn how to configure Freshworks partner event source from the Freshdesk console. Once a partner event source is configured, an AWS SAM template is deployed that creates a custom event bus by attaching the partner event source. A Kinesis Data Firehose, Lambda function, and S3 bucket is used to ingest Freshdesk’s ticket events data for analysis. An EventBridge rule is configured to route the event data to the S3 bucket.

Once event data starts flowing into the S3 bucket, an Amazon Athena table is created to run queries and analyze the ticket events data. Alternative customer service data analysis use cases can be built on the architecture shown in this blog.

To learn more about other partner integrations and the native capabilities of EventBridge, visit the AWS Compute Blog.

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Create a custom data connector to Slack’s Member Analytics API in Amazon QuickSight with Amazon Athena Federated Query

Post Syndicated from Pablo Redondo Sanchez original https://aws.amazon.com/blogs/big-data/create-a-custom-data-connector-to-slacks-member-analytics-api-in-amazon-quicksight-with-amazon-athena-federated-query/

Amazon QuickSight recently added support for Amazon Athena Federated Query, which allows you to query data in place from various data sources. With this capability, QuickSight can extend support to query additional data sources like Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon DocumentDB (with Mongo DB compatibility) via their existing Amazon Athena data source. You can also use the Athena Query Federation SDK to write custom connectors and query any source accessible with a Java API, whether it is relational, non-relational, object, or a custom data endpoint.

A common analytics use case is to access data from a REST API endpoint and blend it with information from other sources. In this post, I walk you through the process of setting up a custom federated query connector in Athena to query data from a REST API endpoint and build a QuickSight dashboard that blends data from the REST API endpoint with other data sources.

To illustrate this use case, we work with Slack, the makers of a leading channel-based messaging platform, to test their Member Analytics API, which can help our mock company, Example Corp, understand Slack adoption and member engagement across different teams.

How the Slack Member Analytics API works

The following diagram illustrates the Slack Member Analytics API.

The following diagram illustrates the Slack Member Analytics API.

The Slack Member Analytics API is a REST API endpoint available for Slack Enterprise Grid customers. Authorized users and services can access the member usage stats dataset via the admin.analytics.getFile endpoint of the Slack Web API. The data consists on a new-line delimited JSON file with daily Slack activity stats at the member level. A record looks like the following code:

{ 
    "enterprise_id":"AAAAAAA",
    "date":"2020-11-10",
    "user_id":"U01ERHY4589",
    "email_address":"[email protected]",
    "is_guest":false,
    "is_billable_seat":false,
    "is_active":true,
    "is_active_ios":false,
    "is_active_android":false,
    "is_active_desktop":true,
    "reactions_added_count":3,
    "messages_posted_count":10, 
    "channel_messages_posted_count":0,
    "files_added_count":0
}

To request data, you must provide a date argument in the format of YYYY-MM-DD, a type argument with the value member, and an OAuth bearer token as the header. The response is a compressed (.gzip) JSON file with data for the requested date. See the following code of a sample request:

curl -X GET -H “Authorization: Bearer xoxp-..."  https://slack.com/api/admin.analytics.getFile?date=2020-09-01&type=member > data.gzip

Building the solution for Example Corp

For our use case, Example Corp has recently purchased Slack for 1,000 users and as the Collaboration team onboards new teams to Slack, they want to measure Slack adoption and engagement within each new team. If they see low adoption or engagement within a group at the company, they can work with that group to understand why they aren’t using Slack and provide education and support, as needed.

Example Corp wants to provide analysts access to the Slack member usage stats to run ad hoc queries in place (directly from the source) without maintaining a new extract, transform, and load (ETL) pipeline. They use the QuickSight cross data source join feature to blend their Slack usage stats with their HR dataset.

To achieve this, Example Corp implements the following steps:

  1. Authorize the custom federated query connector with Slack to access the Member Analytics API.
  2. Develop and deploy a custom federated query connector in the Example Corp AWS account.
  3. Create a dataset in the Example Corp QuickSight environment that reads Slack member usage data for the last 30 days and blends it with an HR dataset.
  4. Create a QuickSight dashboard that shows usage trends of provisioned vs. active users.

Example Corp program managers can now monitor slack engagement using their QuickSight Dashboard (see the following screenshot).

Example Corp program managers can now monitor slack engagement using their QuickSight Dashboard

The following diagram illustrates the overall architecture of the solution.

The following diagram illustrates the overall architecture of the solution.

The following sections describe the components in detail and provide sample code to implement the solution in your environment.

Authorizing the custom federated query connector to access the Slack Analytics API

Data REST API endpoints typically have an authentication mechanism such as standard HTTP authentication or a bearer token. In the case of the Slack Web API, a bearer token is required on every request. The Slack Member Analytics API uses an OAuth protocol to authorize applications’ read access to data from an organization’s Slack environment.

To perform the OAuth handshake, Example Corp deploys a custom web application on Amazon Elastic Compute Cloud (Amazon EC2) and registers it as a new Slack application. When it’s deployed, Example Corp Slack admins can access the web application UI to authenticate with Slack and authorize read access to the custom federated query connector. After successful authentication, the custom web application stores the bearer token as a secret in AWS Secrets Manager. Only the custom application server and the federated query connector have access to this secret.

The following is an architecture diagram and brief description of the OAuth authorization workflow between Slack.com and the custom web application. As a prerequisite, you need to register your custom application with Slack.

The following is an architecture diagram and brief description of the OAuth authorization workflow between Slack.com and the custom web application.

  1. The Slack admin accesses the custom application UI from their browser and chooses Add to Slack to begin the authorization process.

The Slack admin accesses the custom application UI from their browser and chooses Add to Slack to begin the authorization process.

  1. The custom application redirects the admin to Slack.com to authenticate and authorize the client with an admin.analytics:read access for Example Corp Slack Enterprise Grid.

The custom application redirects the admin to Slack.com to authenticate and authorize the client with an admin.analytics:read access for Example Corp Slack Enterprise Grid.

  1. Slack.com redirects the admin back to the custom application UI, passing a temporary authorization code in the request.
  2. On the backend, the custom application retrieves Slack client secrets from a Secrets Manager secret. The Slack client secrets are obtained during the Slack application registration.
  3. The custom application server makes a request for a bearer token to the Slack API, passing both the temporary authorization code and the Slack client secrets.
  4. If both the temporary authorization code and the client secrets are valid, then the Slack API returns a bearer token to the custom application server.
  5. The custom application saves the bearer token in the Secrets Manager secret.
  6. Finally, the application sends a confirmation of successful authorization to the admin.

Slack admins can revoke access to the application from the organization’s console at any time.

You can find the source code and detailed instructions to deploy this sample OAuth web application in the GitHub repo. When the authorization workflow is complete, you can pause or stop the resources running the web application. Going forward, the federated query connector accesses the token from Secrets Manager.

Deploying the custom federated query connector

When the OAuth workflow is complete, we can deploy the custom federated query connector in the Example Corp AWS environment. For Example Corp, we develop a custom AWS Lambda function using the Athena Query Federation Java SDK and a Java HTTP client to connect with the Slack Member Analytics REST API. Finally, we register it as a new data source within Athena.

The following is a diagram of how the custom connector workflow operates.

The following is a diagram of how the custom connector workflow operates.

The workflow includes the following steps:

  1. Users submit a query to Athena using the following query: select * from <catalog_name>.slackanalytics.member_analytics where date='2020-11-10', where <catalog_name> is the name specified when creating the Athena data source.
  2. Athena compiles the query and runs the Lambda function to retrieve the Slack authorization token from Secrets Manager and determine the number of partitions based on the query predicates (where clause).
  3. The Slack Member Analytics Connector partitions the data by date and runs a Lambda function for each partition (date) specified in the query. For example, if the predicate is WHERE date IN (‘2020-11-10’, ‘2020-11-12’), Athena runs two instances of the Lambda function. When no dates are specified in the where clause, the connector gets data for the last 30 days.
  4. Each instance of the Lambda function makes a request to the Slack Member API to retrieve data for each day.
  5. Finally, Athena performs any aggregation and computation specified in the query and return the results to the client.

You can deploy this sample Slack Member Analytics Lambda function in your AWS environment via AWS CloudFormation with the following template. If you want to modify and build the connector from scratch, you can find the source code and instructions in the GitHub repo.

After the Lambda function has been deployed, create a new data source in Athena. For step-by-step instructions, see Deploying a Connector and Connecting to a Data Source.

  1. On the Athena console, in the query editor, choose Connect data source.

On the Athena console, in the query editor, choose Connect data source.

  1. Select All other data sources.
  2. Point your catalog to your new Lambda function.

Point your catalog to your new Lambda function.

You should be able to browse your new catalog within Athena from the Athena console and query the Slack Member Analytics API using SQL.

You should be able to browse your new catalog within Athena from the Athena console and query the Slack Member Analytics API using SQL.

Creating a dataset that reads Slack member usage data and blends it with an HR dataset

As a prerequisite to query the Slack Member Analytics API from QuickSight, we must provide the proper permission for QuickSight to access the federated query data source in Athena. We do this directly from the QuickSight admin UI following these steps:

  1. As an admin, on the Admin menu, choose Manage QuickSight.
  2. Under Security & Permissions, choose QuickSight access to AWS services.
  3. Choose Add or Remove services.
  4. Select Athena.
  5. Choose Next when prompted to set the Amazon Simple Storage Service (Amazon S3) bucket and Lambda function permissions.

QuickSight browses the Athena catalogs and displays any Lambda functions associated with your account. If you don’t see a Lambda function, it means you haven’t mapped a data source within Athena.

  1. Select the function.
  2. Choose Finish.

Choose Finish.

When the Example Corp QuickSight environment has the proper permissions, analysts can query the Slack Analytics Member API using their existing Athena data source. For instructions on creating your own dataset, see Creating a Dataset Using Amazon Athena Data.

The custom connector appears as a new Catalog, Database, and Tables option.

  1. In QuickSight, on the Datasets page, choose New dataset.

In QuickSight, on the Datasets page, choose New dataset.

  1. Choose Athena as your data source.
  2. Choose Create dataset.

Choose Create dataset.

  1. Choose your table or, for this use case, choose Use custom SQL.

Choose your table or, for this use case, choose Use custom SQL.

For this analysis, we write a custom SQL that gets member activity for the last 30 days:

SELECT date,
       is_active,
       email_address,
       messages_posted_count
FROM   slackanalytics_catalog.slackanalytics.member_analytics
WHERE  date >= date_format(date_trunc('month',current_date),'%Y-%m-%d')

With the QuickSight cross data source join feature, analysts can enrich the Slack member stats with their HR info. For this use case, we imported a local HR_dataset.csv file containing the list of subscribed users with their respective Example Corp department, and joined them via the employee_email field.

With the QuickSight cross data source join feature, analysts can enrich the Slack member stats with their HR info.

The result is a dataset with Slack activity by employee and department. We’ve also updated the date field from a String type to a Date type using the QuickSight Data Prep page to take advantage of additional visualization features with Date type fields.

The result is a dataset with Slack activity by employee and department.

Creating a QuickSight dashboard that shows usage trends of provisioned vs. active users

Example Corp Analysts want to visualize the trend of provisioned users vs. active users and understand Slack adoption by department. To support these visualizations, we created the following calculated fields within our QuickSight analysis:

  • active distinct_countIf(employee,{is_active}='true')
  • provisioneddistinct_count(employee)

You can also create these calculated fields when you create your dataset. This way, you can reuse them in other QuickSight analyses. 

We use QuickSight narrative insights, a line chart, a bar chart, and a pivot table with conditional formatting to create the following analysis.

We use QuickSight narrative insights, a line chart, a bar chart, and a pivot table with conditional formatting to create the following analysis.

From this analysis, Example Corp can see that the adoption trend is positive; however, there is an adoption gap within the Marketing team. The program managers can engage the Marketing department leads and focus their training resources to improve their adoption.

From this analysis, Example Corp can see that the adoption trend is positive; however, there is an adoption gap within the Marketing team.

This dashboard can now be published to stakeholders within the organization as needed—either within the QuickSight app or embedded within existing enterprise applications. 

Conclusion

With the recent integration of QuickSight and Athena Federated Query, organizations can access additional data sources beyond those already supported by QuickSight. Analysts can leverage QuickSight capabilities to analyze and build dashboards that blend data from a variety of data sources, and with the Athena Query Federation SDK, you can build custom connectors to access relational, non-relational, object, and custom data endpoints using standard SQL.

To get started, try the lab Athena Deploying Custom Connector.


About the Author

Pablo Redondo SanchezPablo Redondo Sanchez is a Senior Solutions Architect at Amazon Web Services. He is a data enthusiast and works with customers to help them achieve better insights and faster outcomes from their data analytics workflows. In his spare time, Pablo enjoys woodworking and spending time outdoor with his family in Northern California.

 

 

 

Querying a Vertica data source in Amazon Athena using the Athena Federated Query SDK

Post Syndicated from Kelly Ragan original https://aws.amazon.com/blogs/big-data/querying-a-vertica-data-source-in-amazon-athena-using-the-athena-federated-query-sdk/

The ability to query data and perform ad hoc analysis across multiple platforms and data stores with a single tool brings immense value to the big data analytical arena. As organizations build out data lakes with increasing volumes of data, there is a growing need to combine that data with large amounts of data in other data stores. As the variety of data increases, it becomes paramount to have a query tool to bridge two or more data stores with a single query.

Even though data lakes became popular for analytic workloads recently, it’s not uncommon to have data warehouses in addition to data lakes for various reporting and business intelligence (BI) use cases. It becomes imperative to be able to seamlessly query the data stored in the data warehouse and the data lake. To address this issue, Amazon Athena has released a feature called Athena Federated Query. Athena is an interactive query service provided by AWS that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Vertica is a columnar MPP database platform that can be deployed in the cloud or on premises, and supports exabyte scale data warehouses. With Athena Federated Query and the Vertica connector, you can now run analytical queries over a data warehouse on Vertica and a data lake in Amazon S3.

Athena Federated Query includes pre-built connectors to a variety of AWS services and databases, as well as an SDK to build custom connectors to other databases and data stores. With this feature, federated queries can pull data from a data lake in an S3 bucket and from an external data source, and then combine it into a single result set in Athena. These connectors are an extension of the Athena query engine, which translates content between Athena and the external data source. Pre-built connectors exist for Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), and Amazon Relational Database Service (Amazon RDS), as well as a JDBC connector for Amazon Redshift, MySQL, and PostgreSQL. For other types of relational databases, you can use the Athena Federated Query SDK to create a custom connector.

In this post, we demonstrate how to deploy the custom connector between Athena and a Vertica database built using the Athena Federated Query SDK. After deploying the custom connector, we demonstrate issuing federated queries and moving data from Vertica to a data lake using CREATE TABLE AS (CTAS) with a federated query.

AWS services used in the solution

The Athena Federated Query SDK is an open-source framework to build custom connectors, and comes with a connector publish tool that deploys the connector executables in an application to the AWS Serverless Application Repository. The Athena Federated Query uses an AWS Lambda function that in turn uses the application deployed to the AWS Serverless Application Repository.

A custom connector is composed of a Lambda function that utilizes three components:

  • MetadataHandler – An interface that exposes metadata information of schemas, tables, and columns from the underlying data store to Athena
  • RecordHandler – An interface that provides hooks to read data from the external source and share it with the Athena query engine in Apache Arrow columnar format
  • CompositeHandler – For managing running the MetadataHandler and RecordHandler

The Lambda function connects to the external data store using an appropriate connection protocol and sends the parsed SQL statement. In the case of Vertica, it is JDBC. The RecordHandler processes the result set produced by the external data store and passes the rows to Athena for final processing. Multiple Lambda functions are called by Athena depending on the Lambda concurrency settings to read the result set in parallel. A spill bucket is used to handle a large dataset that exceeds the Lambda server’s capacity to process the result set.

The JDBC connection established by the Lambda function to the external database is used to send the parsed SQL statement and retrieve the result set rows from the external database. This scenario works well in terms of bandwidth for smaller databases and result sets. However, you might have Vertica deployments with petabyte or exabyte data warehouses. Typical queries return result sets on the order of 10, 20, 30 gigabytes, or more. Due to the bandwidth issue with a JDBC connection, the solution presented in this post modifies the Athena Federated Query SDK to implement a different route for the transmission of large result sets from Vertica to the Athena server for final processing.

The alternate solution utilizes the Vertica EXPORT command as a wrapper around the parsed SQL statement. You can use the EXPORT command to write a result set for a SQL statement directly to an S3 bucket using Vertica’s highly parallelized write to Amazon S3 using partitioning. This solution modifies the SDK to allow Athena to read the result set in the S3 bucket, determine the number of partitions, and call subsequent Lambda functions to parallelize the read of the result set. This produces an efficient way to move a multi-gigabyte result set from Vertica to Athena with parallelized writes from Vertica to Amazon S3 and parallelized reads from Amazon S3 to Athena. When connecting to the Vertica database, the SDK uses AWS Secrets Manager to retrieve a user ID and password for a service account on the Vertica database.

Solution architecture

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The connector components are as follows:

  1. A user issues a federated SQL query in Athena against a table in Vertica.
  2. Athena parses the query and calls a Lambda function.
  3. The Lambda function makes a call to Secrets Manager to get the user ID and password for connecting to Vertica.
  4. The connector sends an EXPORT statement wrapper with the embedded SQL statement to Vertica through the JDBC connection. For example, see the following code:
    EXPORT TO PARQUET (directory = 's3://<bucket_name>/<folder_name>, 
    Compression='Snappy', fileSizeMB=64) OVER() as   
    SELECT  
    ORDER_ID,  
    ITEM,  
    CUSTOMER_ID,
    ORDERED_DATE
    FROM SCHEMA1.ORDERS  
    WHERE CUSTOMER_ID = 2;
    

  5. Vertica processes the SQL query and writes the result set to the S3 bucket specified in the EXPORT command. Vertica parallelizes the write to S3 bucket based on the fileSizeMB parameter into as many partitions as needed for the result set.
  6. Athena calls a Lambda function to scan the S3 bucket in order to determine the number of files to read for the result set.
  7. Athena invokes multiple Lambda functions depending on the number of partitions using Amazon S3 Select. This allows Athena to parallelize the read of the S3 files.
  8. Athena combines the result set returned from Vertica with data scanned from the data lake, and returns the combined result set to the user.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • Amazon EC2 IAM role permissions – The AWS Identity and Access Management (IAM) role of the Amazon Elastic Compute Cloud (Amazon EC2) machines hosting the Vertica database must be given write permissions to the VerticaExport S3 bucket, which is created when deploying the connector.
  • Secrets Manager – The Vertica connection credentials are stored in Secrets Manager. The secret name is prefixed with Vertica– and the secret value is the connection credentials.
  • Lambda IAM role permissions – When the Lambda function is deployed to the AWS Serverless Application Repository, it creates a custom IAM role for the function to run. The custom role has the following IAM permissions in order to successfully perform the read and write functions associated with the MetadataHandler and RecordHandler:
    • AWSLambdaBasicExecutionRole
    • AWSLambdaVPCAccessExecutionRole
    • For Secrets Manager, GetSecretValue for secrets with a prefix given in SecretNameOrPrefix
    • For Amazon S3, list, read, and write permissions for SpillBucket and ExportBucket, and list permissions for all S3 buckets
    • For Athena, GetQueryExecution

Demonstration tables

To demonstrate the Athena Vertica connector capabilities, we use the following components:

  • A Vertica database running in our AWS environment.
  • A Vertica table called orders containing details of customer orders.
  • An Athena table called customer, which has an S3 bucket as a data source. This table contains information regarding customers.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the orders table in Vertica.

The following screenshot shows the details of the orders table in Vertica.

Setting up the Athena Vertica connector project

To set up your connector project, complete the following steps:

  1. Create an S3 bucket in your AWS account. This is the bucket where the result set from Vertica is exported.
  2. Create another S3 bucket in your AWS account. This is the bucket where the code for the connector is stored and retrieved.
  3. Grant the IAM role of the EC2 machines hosting the Vertica database read and write permissions to the S3 result set bucket, allowing Vertica to export data to the bucket.
  4. Clone the GitHub repo in your local folder.
  5. Open the project in your preferred IDE.
  6. From the athena-query-federation directory, run mvn clean install.
  7. From the athena-vertica directory, run mvn clean install.
  8. From the athena-vertica directory, run ../tools/publish.sh <s3_code_bucket_name> athena-vertica [region] to publish the connector to your private AWS Serverless Application Repository.
  9. Upon successful completion of the script, the connector’s serverless application is published to the AWS Serverless Application Repository.

Deploying the connector

To deploy your connector, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Published Applications.
  2. On the Private Applications tab, select Show apps that create in order to see deployed applications.
  3. Choose the VerticaAthenaConnector serverless app.
  4. For AthenaCatalogName, enter the name of the connector Lambda function used when querying the Vertica tables (avc).
  5. For SecretNameOrPrefix, enter the prefix used to store the Vertica credentials in Secrets Manager (the default is Vertica-).
  6. For SpillBucket, enter the S3 bucket name where data is spilled in case the result set data volume crosses a certain limit (test-spill-bucket).
  7. For VerticaExportBucket, enter the S3 bucket where the result set from Vertica is exported (test-export-bucket).
  8. For VpcId, enter your VPC ID.

For VpcId, enter your VPC ID.

  1. For SpillPrefix, enter athena-spill.
  2. For SubnetIds, enter your subnet IDs.
  3. For VerticaConnectionString, enter the connection string of the Vertica database in the following format:
    jdbc:vertica://<host_name>:<port>/<database>?user=${vertica-username}&password=${vertica-password}
    

    Where, vertica-username and vertica-password are the secret names of the Vertica   user credentials stored in AWS Secrets Manager.

  1. Select I acknowledge that this app creates custom IAM roles.

Select I acknowledge that this app creates custom IAM roles.

  1. Choose Deploy.

Upon successful deployment, a Lambda function with the name given for AthenaCatalogName is deployed in your AWS environment. We use this function to issue federated queries to Vertica. The connector is now deployed and ready to use.

Using the connector

On the Athena console, you can query Vertica tables as shown in the following code. The value for <lambda_function> corresponds to the function you created in the previous section.

SELECT  
ORDER_ID,   
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
ORDER BY ORDER_ID DESC

In this example, we named the function as avc. The following screenshot shows our query results.

The following screenshot shows our query results.

This demonstrates that the newly deployed connector read the user-requested columns and the Vertica source table, wrapped an EXPORT statement around the SQL statement, and ran it in Vertica. The results of this query were exported to the specified S3 bucket (test-export-bucket) in Parquet format. The connector then invoked multiple Lambda functions to read the data from the S3 bucket using Amazon S3 Select and displayed it on the Athena console. Note that currently the connector exports Vertica timestamp and timestamptz data types as a varchar data type. Therefore we need to use the date_parse(string, format) function to convert the timestamps columns into the correct data type.

We can also create an Athena table using CTAS with the result set of the Vertica query using the following query:

CREATE TABLE default.vertica_customers_table AS (
SELECT  
ORDER_ID,
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
);

We can then use the newly created table to query the data as shown in the following screenshot.

We can then use the newly created table to query the data as shown in the following screenshot.

In addition, we can also query and join the customer data in Amazon S3 and orders data in Vertica using the following sample query:

WITH   
customer_data AS (  
  SELECT   
   	CUSTOMER_NAME,  
   	CUSTOMER_ID  
    
  FROM default.customer
  ),  
orders_data AS (  
  SELECT    
   	ORDER_ID,  
   	PRODUCT_NAME,  
   	CUSTOMER_ID   
  FROM "lambda:<lambda_function>".schema1.orders  
  )  
SELECT a.CUSTOMER_ID, b.ORDER_ID, b.PRODUCT_NAME
FROM customer_data a 
INNER JOIN orders_data b
ON a.customer_data.customer_id = b.orders_data.customer_id 
WHERE lower(b.PRODUCT_NAME) like 'pencil'
ORDER BY b.ORDER_ID DESC

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This demonstrates the ease of performing analytics across multiple platforms and data stores.

Conclusion

In this post, we introduced the Athena Vertica connector, its solution architecture, and demonstrated how to deploy the connector using the Athena Federated Query SDK. We saw how to run SQL queries on the Vertica data source. We also learned that we can use the connector to perform extract, transform, and load operations on the data in the Vertica tables and Amazon S3, enabling us to perform faster and better analytics across multiple platforms and data sources.

For more information about Athena Federated Query, see the GitHub repo.

Special Acknowledgement

Special acknowledgement goes to the Intuit Data Engineering staff Denise McInerney – Data Architect, Sanjay Rane – Group Engineering Manager – Data, and Kannan Nagarajan – Database Architect. They helped design, review, and support the development of the custom connector and architecture.


About the Authors

Kelly RaganKelly Ragan is a Senior Data Architect, Strategic Accounts Team, AWS Professional Services. He helps customers solve big data problems and wrestle with large-scale data warehouses. In his spare time, he enjoys snow skiing, bicycling, and camping in the Pacific Northwest.

 

 

Rohit MasurRohit Masur is an Associate Big Data Consultant, Data and Analytics Team, AWS Professional Services. He helps customers architect and implement solutions on AWS to get business value out of data. In his spare time, he enjoys reading books, going on long walks, and exploring new hiking trails in the Bay Area.

Automating AWS service logs table creation and querying them with Amazon Athena

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automating-aws-service-logs-table-creation-and-querying-them-with-amazon-athena/

I was working with a customer who was just getting started using AWS, and they wanted to understand how to query their AWS service logs that were being delivered to Amazon Simple Storage Service (Amazon S3). I introduced them to Amazon Athena, a serverless, interactive query service that allows you to easily analyze data in Amazon S3 and other sources. Together, we used Athena to query service logs, and were able to create tables for AWS CloudTrail logs, Amazon S3 access logs, and VPC flow logs. As I was walking the customer through the documentation and creating tables and partitions for each service log in Athena, I thought there had to be an easier and faster way to allow customers to query their logs in Amazon S3, which is the focus of this post.

This post demonstrates how to use AWS CloudFormation to automatically create AWS service log tables, partitions, and example queries in Athena. We also use the SQL query editor in Athena to query the AWS service log tables that AWS CloudFormation created.

Athena best practices

This solution is appropriate for ad hoc use and queries the raw log files. These raw files can range from compressed JSON to uncompressed text formats, depending on how they were configured to be sent to Amazon S3. If you need to query over hundreds of GBs or TBs of data per day in Amazon S3, performing ETL on your raw files and transforming them to a columnar file format like Apache Parquet can lead to increased performance and cost savings. You can save on your Amazon S3 storage costs by using snappy compression for Parquet files stored in Amazon S3. To learn more about Athena best practices, see Top 10 Performance Tuning Tips for Amazon Athena.

Table partition strategies

There are a few important considerations when deciding how to define your table partitions. Mainly you should ask: what types of queries will I be writing against my data in Amazon S3? Do I only need to query data for that day and for a single account, or do I need to query across months of data and multiple accounts? In this post, we talk about how to query across a single, partitioned account.

By partitioning data, you can restrict the amount of data scanned per query, thereby improving performance and reducing cost. When creating a table schema in Athena, you set the location of where the files reside in Amazon S3, and you can also define how the table is partitioned. The location is a bucket path that leads to the desired files. If you query a partitioned table and specify the partition in the WHERE clause, Athena scans the data only for that partition. For more information, see Table Location in Amazon S3 and Partitioning Data. You can then define partitions in Athena that map to the data residing in Amazon S3.

Let’s look at an example to see how defining a location and partitioning our table can improve performance and reduce costs. In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket, starting from the bucket name and going all the way down to the day.

In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket

Outlined in red is where we set the location for our table schema, and Athena then scans everything after the CloudTrail folder. We then outlined our partitions in blue. This is where we can specify the granularity of our queries. In this case, we partition our table down to the day, which is very granular because we can tell Athena exactly where to look for our data. This is also the most performant and cost-effective option because it results in scanning only the required data and nothing else.

If you have to query multiple accounts and Regions, you should back off the location to AWSLogs and then create a non-partitioned CloudTrail table. This allows you to write queries across all your accounts and Regions, but the trade-off is that your queries take much longer and are more expensive due to Athena having to scan all the data that comes after AWSLogs every query. However, querying multiple accounts is beyond the scope of this post.

Prerequisites

Before you get started, you should have the following prerequisites:

  • Service logs already being delivered to Amazon S3
  • An AWS account with access to your service logs

Deploying the automated solution in your AWS account

The following steps walk you through deploying a CloudFormation template that creates saved queries for you to run (Create Table, Create Partition, and example queries for each service log).

  1. Choose Launch Stack:

  1. Choose Next.
  2. For Stack name, enter a name for your stack.

You don’t need to have every AWS service log that the template asks for. If you don’t have CloudFront logs for example, you can leave the PathParameter as is. If you need CloudFront logs in the future, you can simply update the Create Table statement with the correct Amazon S3 location in Athena.

  1. For each service log table you want to create, follow the steps below:
  • Replace <_BUCKET_NAME> with the name of your S3 bucket that holds each AWS service log. You can use the same bucket name if it’s used to hold more than one type of service log.
  • Replace <Prefix> with your own folder prefix in Amazon S3. If you don’t have a prefix, make sure to remove it from the path parameters.
  • Replace <ACCOUNT-ID> and <REGION> with desired account and region.

Choose Next.

  1. Choose Next.
  2. Enter any tags you wish to assign to the stack.
  3. Choose Next.
  4. Verify parameters are correct and choose Create stack at the bottom.

Verify the stack has been created successfully. The stack takes about 1 minute to create the resources.

Querying your tables

You’re now ready to start querying your service logs.

  1. On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

  1. Choose Create Table – CloudTrail Logs to run the SQL statement in the Athena query editor.

Make sure the location for Amazon S3 is correct in your SQL statement and verify you have the correct database selected.

  1. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

The table cloudtrail_logs is created in the selected database. You can repeat this process to create other service log tables.

For partitioned tables like cloudtrail_logs, you must add partitions to your table before querying.

  1. On the Saved queries tab, choose Create Partition – CloudTrail.
  2. Update the Region, year, month, and day you want to partition. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

After you run the query, you have successfully added a partition to your cloudtrail_logs table. Let’s look at some of the example queries we can run now.

  1. On the Saved queries tab, choose Query – CloudTrail Logs.

This is a base template included to begin querying your CloudTrail logs.

  1. Highlight the query and choose Run query.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

Let’s say we have a spike in API calls from AWS Lambda and we want to see the users that the calls were coming from in a specific time range as well as the count for each user. Our query looks like the following code:

SELECT useridentity.sessioncontext.sessionissuer.username as "User",
       count(eventname) as "Lambda API Calls"
FROM cloudtrail_logs
WHERE eventsource = 'lambda.amazonaws.com'
       AND eventtime BETWEEN '2020-11-24T18:00:00Z' AND '2020-11-24T21:00:00Z' 
group by useridentity.sessioncontext.sessionissuer.username
order by count(eventname) desc

Or if we wanted to check our S3 Access Logs to make sure only authorized users are accessing certain prefixes:

SELECT *
FROM s3_access_logs
WHERE key='prefix/images/example.jpg'
        AND requester != 'arn:aws:iam::accountid:user/username'

Cost of solution and cleaning up

Deploying the CloudFormation template doesn’t cost anything. You’re only charged for the amount of data scanned by Athena. Remember to use the best practices we discussed earlier when querying your data in Amazon S3. For more pricing information, see Amazon Athena pricing and Amazon S3 pricing.

To clean up the resources that were created, delete the CloudFormation stack you created earlier. This also deletes the saved queries in Athena.

Summary

In this post, we discussed how we can use AWS CloudFormation to easily create AWS service log tables, partitions, and starter queries in Athena by entering bucket paths as parameters. We used CloudTrail and Amazon S3 access logs as examples, but you can replicate these steps for other service logs that you may need to query by visiting the Saved queries tab in Athena. Feel free to check out the video as well, where I go over how we store logs in Amazon S3 and then give a quick demo on how to deploy the solution.

For more information about service logs, see Easily query AWS service logs using Amazon Athena.


About the Author

Michael Hamilton is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife, kids, and a 2-year-old German shepherd.

Building AWS Data Lake visualizations with Amazon Athena and Tableau

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/building-aws-data-lake-visualizations-with-amazon-athena-and-tableau/

Amazon Athena is an interactive query service that makes it easy to analyze data in a data lake using standard SQL. One of the key elements of Athena is that you only pay for the queries you run. This is an attractive feature because there is no hardware to set up, manage, or maintain.

You can query Athena with SQL or by using data visualization tools such as Amazon QuickSight, Tableau, or other third-party options. QuickSight is a cloud-native business intelligence (BI) service that you can use to visually analyze data and share interactive dashboards with all users in your organization. QuickSight is fully managed and serverless, requires no client downloads for dashboard creation, and has a pay-per-session pricing model that allows you to pay for dashboard consumption with a maximum charge of $5.00 per reader per month. The combination of QuickSight and Athena allows you to rapidly deploy dashboards and BI to tens of thousands of users, while only paying for actual usage, and not worrying about server deployment or management. Tableau allows you to similarly share dashboards with readers when utilizing Tableau servers. This post demonstrates how you can connect to Athena from a Tableau desktop, create a dashboard that queries data from Athena, and publish to a Tableau server for broader distribution.

Integration of Tableau with Athena as a data source is gaining in popularity, and many customers prefer to create Tableau dashboards on Athena. Performance of the dashboard usually varies based on many factors, such as number of fields and views, data size, network bandwidth, Athena query runtime, dashboard rendering time, connection type, and location of the Tableau server (on premises or AWS). We walk you through the required configuration to integrate Tableau with Athena, best practices, and Athena runtime analysis.

Solution overview

In this solution, we build an end-to-end Tableau dashboard using Athena as a data source for data analysts. The dashboard has two views:

  • Student’s study time based on age group and gender
  • Geo location of the students

The dashboard is very helpful in analyzing a student’s performance in the class and their health condition.

You walk through the following steps:

  1. Configure Tableau to connect to Athena.
  2. Connect Tableau Desktop to Athena to build dashboard.
  3. Create a Tableau dashboard and publish to Tableau server.
  4. Analyze the Tableau dashboard.

We also review best practices and design patterns of Tableau development with Athena.

The following diagram illustrates the architecture of this solution.

The following diagram illustrates the architecture of this solution.

Prerequisites

You need the following prerequisites before you can proceed with solution:

Configuring Tableau to connect to Athena

Athena connects to Tableau via an Athena JDBC driver. Complete the following configuration steps:

  1. Install the appropriate version of 64-bit Java. A minimum JDK 7.0 (Java 1.7) is required.
  2. Download the JDBC driver (.jar) file that matches with your version of the JDK.
  3. Move the downloaded .jar file to the following location, based on your operating system:
    1. For Windows, use C:\Program Files\Tableau\Drivers.
    2. For Mac, use ~/Library/Tableau/Drivers location.

Setting up Athena

For this use case, you create an Athena table called student that points to a student-db.csv file in an S3 bucket. Additionally, you create the view student_view on top of the student table. You build the Tableau dashboard using this view. You expose only a subset of columns from the student table in the view.

You can download the Athena DDL (student.sql and student_view.sql) and data file student-db.csv from GitHub repo.

  1. On the Amazon S3 console, upload the student-db.csv file in the S3 bucket you created as a prerequisite.

  1. On the Athena console, use the following DDL statement in the query editor to create your studentdb database:
    CREATE DATABASE studentdb;

For more information, see Creating Databases in Athena.

  1. Choose the studentdb database and use the following DDL statement to create the student table (provide the name of your S3 bucket):
    CREATE EXTERNAL TABLE student(
      `school` string, 
      `country` string, 
      `sex` string, 
      `age` string, 
      `studytime` int, 
      `failures` int, 
      `preschool` string, 
      `higher` string, 
      `remotestudy` string, 
      `health` string)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://<your_bucket_name>/'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'skip.header.line.count'='1', 
      'transient_lastDdlTime'='1595149168')

  1. Use the following DDL statement to create the student_view view. This creates an Athena view with a limited number of fields to build the Tableau dashboard.
    CREATE OR REPLACE VIEW student_view AS 
    SELECT
      "school"
    , "country"
    , "sex"
    , "age"
    , "health"
    , "studytime"
    ,"failures"
    FROM
      student

Now if you query student_view on the Athena console with a select * SQL statement, you can see the following output.Now if you query student_view on the Athena console with a select * SQL statement, you can see the following output.
Connecting Tableau Desktop to Athena

Athena connects to Tableau via a JDBC driver. With the Amazon Athena connector, you can quickly and directly connect Tableau to their Amazon S3 data for fast discovery and analysis, with drag-and-drop ease. Tableau Desktop is used to create worksheets, dashboards, stories connecting to data sources which can be files or server.

In this section, you connect Tableau Desktop to Athena.

  1. Open Tableau Desktop.
  2. On the navigation pane, choose More.
  3. Choose Amazon Athena.

Choose Amazon Athena.

  1. For Server, enter athena <region>.amazonaws.com. 

Use the Region that you’re using to set up the Athena table and view. For more information, see Amazon Athena endpoints and quotas.

  1. For Port, enter 442.
  2. For S3 Staging Directory, enter the path of the Amazon S3 location where you want to store query results.

The path is available on the Settings page on the Athena console, under Query result location.

 

  1. For Access Key ID and Secret Access Key, enter the appropriate values.

After you sign in, you create a dashboard by selecting your database and available table or view.

  1. Choose student_view.
  2. Drag and drop this view as a data source.

Drag and drop this view as a data source.

  1. Create the worksheet country-wise, as per the configuration in the following screenshot.

Create the worksheet country-wise, as per the configuration in the following screenshot.

  1. Create another worksheet called age-wise.

Create another worksheet called age-wise.

  1. On the Dashboard menu, choose New Dashboard.
  2. Drag and drop country-wise and age-wise.

Drag and drop country-wise and age-wise.

You have created a Tableau dashboard successfully, your data is in Athena, and you’re ready to share it with the rest of your organization by publishing the dashboard. Before you publish, you need to configure the plan to refresh the Athena data sources used by the Tableau dashboard.

Two options are available:

  • Live connection – A Tableau live connection offers real-time updates, with any changes in the data source reflecting immediately in Tableau.
  • Data extract – Data extracts are snapshots of data optimized into system  memory to be quickly recalled for visualization. Extracts are likely to be much faster than live connections, especially in complex visualizations with large datasets, filters, calculations, and so on. For more information, see Refresh Extracts.
  1. On the Server menu, choose Publish Workbook.

On the Server menu, choose Publish Workbook.

After you share the link with end-users, they can view the dashboard.

After you share the link with end-users, they can view the dashboard.

The Tableau dashboard run and refresh creates a query in Athena for each visualization. To view the query, choose the History tab on the Athena console.

To view the query, choose the History tab on the Athena console.

The following code is the query generated from the country-wise visualization:

SELECT "student_view"."age" AS "age",
  "student_view"."sex" AS "sex",
  SUM("student_view"."studytime") AS "sum:studytime:ok"
FROM "studentdb"."student_view" "student_view"
GROUP BY "student_view"."age",
  "student_view"."sex"

The following code is the query generated from the age-wise visualization:

SELECT "student_view"."country" AS "country",
  SUM("student_view"."studytime") AS "sum:studytime:ok"
FROM "studentdb"."student_view" "student_view"
GROUP BY "student_view"."country"

Analyzing the Athena query runtime

The Tableau dashboard is a collection of several views and visualizations. The dashboard runtime depends on many factors, including:

  • Database query execution time
  • Data transfer time
  • Dashboard visualization rendering time
  • Number of visualizations, filters, data volume, total number of fields, number of rows, KPI calculation complexity, custom scripts runtime, and so on.
  • Number of concurrent users
  • Workload and Tableau Server sizing

Each Tableau dashboard visualization generates an Athena query. Each query that runs is known as a query execution. The query execution has a unique identifier known as the query ID or query execution ID. You can identify Tableau dashboard queries on the Athena console using query IDs. The query IDs for Tableau queries can be found from the driver logs. For more information, see Enable Driver Logging for Amazon Athena Using a .properties File.

You can further check the query runtime on the Athena console. On the History tab, search for the query with the query ID. The following screenshot shows the search results for the student age query.

The following screenshot shows the search results for the student age query.

The following screenshot shows the search results for the student country query.

The following screenshot shows the search results for the student country query.

In this use case, you have two queries from two visualizations. Both queries start at the same time, and the query runtime is 1.22 seconds.

Best practices on Athena to Tableau integration

The following best practices may be useful as you build Tableau dashboards on Athena:

  • There may be use cases where you want to create complex queries as views by joining multiple tables in Athena. You use these queries in Tableau to build the dashboard. The runtime of these views can take a while to complete because it depends on the underlying complexity of the view, such as the number of joins and filters. This isn’t ideal for live database connections, but it works well in a data extract model where you can refresh the data in Tableau on a schedule.
  • By using views and extracts, you can also minimize Athena costs. You only run the query one time during extraction and then publish the extract to Tableau. This means you can be efficient in leveraging the Tableau Hyper engine while minimizing your costs in Athena.
  • Tableau data extract provides performance benefits, but there are situations where you need live data. In this case, data extract isn’t an option, and you can choose Tableau live connection.
  • You can partition your dataset. Partitioning divides your table into parts and keeps the related data together based on column values such as date, country, and region. Data partition restricts the amount of data scanned by each query, thereby improving performance and reducing cost.
  • Rather than query the CSVs directly in Athena, you can write the data to Amazon S3 as Apache Parquet or Apache ORC files, which is an optimized columnar format that is ideal for analytic queries. For more information about performance tuning, see Top 10 Performance Tuning Tips for Amazon Athena.
  • You can convert your existing data to Parquet or ORC using Apache Spark or Apache Hive on Amazon EMR or AWS Glue. For more information, see Analyzing Data in S3 using Amazon Athena. See also the following resources:
  • You can also use the Athena Create Table As feature to convert to Parquet format. The following query converts the student CSV data to Parquet and creates a student_parquet table (provide the S3 bucket name where you want to store the Parquet file):
    CREATE TABLE studentdb.student_parquet
        WITH (
              format = 'PARQUET',
              parquet_compression = 'SNAPPY',
              external_location = 's3:// <BUCKET_NAME>/parquet_files'
        ) AS SELECT * FROM student

The following table compares query data scanning and run times between the two Athena tables.

Table Query Data Scanned Runtime
student SELECT * FROM "studentdb"."student"; 23.17 KB 0.63 seconds
student_parquet SELECT * FROM "studentdb"."student_parquet"; 1.93 KB 0.49 seconds

The following best practices may be useful as you deploy Tableau Server on AWS:

  • Choose the right Amazon Elastic Compute Cloud (Amazon EC2) instance type based on your workload. A total of 8 CPU cores (16 AWS vCPUs) and 64 GB RAM are recommended for a single production EC2 instance. The typical EC2 instance types to use for Tableau Server is C5.4xlarge, m5.4xlarge, and r5.4xlarge.
  • You should test your workload to check any performance bottleneck due to CPU. You can use a different set of Tableau dashboards to check the performance and add more CPU as required.
  • EC2 instances with insufficient RAM may cancel out the benefit of high-end CPU. You should choose to run with 64+ GB RAM for production workloads. Although it’s important to choose an instance with sufficient CPU, running Tableau Server on instances starved for RAM may lead to degraded performance.
  • Tableau Server has several processes and components, including a database (PostgreSQL) that stores the system’s metadata. Tableau Server needs a high level of disk throughput in order to perform well, and it’s recommended to use Amazon Elastic Block Store (Amazon EBS) SSD (gp2) or provisioned IOPS volumes. Magnetic disks are not recommended. Provision a minimum 30–50 GB volume for the operating system and over 100 GB for Tableau Server.

For more information, see Optimizing Performance of Tableau Server on AWS.

If you have Tableau client and server deployments, you can use Athena to directly connect to data lakes in Amazon S3 and pull the latest information on dashboards, either via a live connection or with an extract. QuickSight offers similar capabilities, with the option of direct connection to Athena, or via periodic refresh of data into SPICE. SPICE is a high-performance in-memory data store that natively provides high concurrency and high availability without the need for any server sizing, setup, or management. QuickSight also provides centralized, AWS native IAM-based control over access to Athena, which removes the need to store individual user credentials in client software on individual user machines.

Cleaning up

To avoid incurring future charges, delete the data file from the S3 bucket.

  1. On the Amazon S3 console, choose the bucket where you uploaded student-db.csv.
  2. Select student-db.csv.
  3. On the Actions menu, choose Delete.

Conclusion

In this post, you’ve seen how to connect Tableau to Athena and start gaining insights into data. This post also discussed the best practices when building a Tableau dashboard on Athena.


About the Author

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration and strategy. He is passionate about technology and enjoys building and experimenting in Analytics and AI/ML space.

How EMX reduced data pipeline costs by 85% with Amazon Athena

Post Syndicated from Gary Bouton original https://aws.amazon.com/blogs/big-data/how-emx-reduced-data-pipeline-costs-by-85-with-amazon-athena/

This is a guest blog post by Gary Bouton and Louis Ashner from EMX. In their own words, “ENGINE Media Exchange (EMX) is a leading marketing technology company, leveraging a patented, end-to-end tech stack purpose-built to meet the demands of today’s digital marketplace. The company creates both open- and closed-loop solutions designed to unify advertisers, platforms, and publishers across digital media channels—including advanced TV, video, display, search, and social.”

While recognized as an independent solutions provider for the digital media landscape, EMX also serves as the technology and programmatic division for its parent, ENGINE—a global data-driven marketing company serving advertising’s most recognized brands.

In the past, we used typical legacy data warehouse solutions for our data pipeline. We needed massive clusters to house all our raw data as well as advanced pipelines to import that data into the final database. We then needed to query the raw data clusters to aggregate and move it into a separate cluster for more frequent querying. This process was not only time consuming, but also quite expensive, because these legacy database clusters weren’t cheap in order to house that much data.

Then came Amazon Athena, which allowed us to not only simplify our pipeline, but also save on costs significantly. We were able to simply route the raw data straight to Amazon Simple Storage Service (Amazon S3) at minimal storage costs, then query the data with Athena to move aggregate data into small Amazon Redshift clusters for querying more frequently. Athena’s querying is not only quick, but the query costs are mere pennies when the table is set up correctly with partitions and the query utilizes them properly. Additionally, we can increase our data retention time, because the cost of storing data in Amazon S3 is significantly cheaper than an ever-growing legacy data warehousing solution.

In short, Athena allowed us to simplify our data pipeline while saving 85% on data storage costs at the same time.

This post discusses the following:

  • Why EMX digital chose Athena for its backend ETL workflow
  • How EMX manages Athena performance and run time
  • How EMX continues to scale Athena with new products and create coherent workflows
  • The benefits of this solution for EMX

Advantages of a robust backend ETL workflow when dealing with fast and furious big data

For most companies, data is an ever-growing problem. The volume, velocity, variety, and veracity of its availability can be performance limiting and a financial burden.

Data is very important to us at EMX; we process over 450,000 requests per second, which we clean, audit, and deliver for reporting and optimizations to keep our clients informed and current in an ever-changing ad space.

To do this, we have to have a backend system that is robust, on time, available, and cost effective in order to meet the demands of our split-second decision-making.

Why Athena is the right tool for EMX

As detailed below, Athena’s pay-per-query pricing model, performance and reliability at scale, and ease of use made it the right tool for us.

  • Scale – EMX processes over 2 TB of raw and sometimes unstructured data every hour for reporting and optimizations. The ability to run these jobs without managing the cluster optimizations directly allows the team to focus on more research and product goals. Using Athena allows us to focus more on research, product development ideas, and ad hoc tasks, and alleviates us from having to take time to estimate the process and computational power needed to complete jobs in time.
  • Cost – Cost per query is at least four times cheaper than other backend ETL tools, and its on-demand nature means we only pay for what we use. We’re no longer losing increasing costs by keeping up a system that isn’t being used. The feedback of cost per query through Athena also allows us to tune and optimize our logic, to not only reduce that cost further but test new ways to split into and run our production ETL jobs.
  • Resilience – We have thrown everything but the kitchen sink problems at Athena while building out our production pipeline, and were impressed at the lack of failure from the service. Even though we don’t directly own the resources to the cloud solution of Athena, it has always had high availability. In instances where availability was hampered, Athena has made it easy and straightforward to add in workflow hooks to retry failed jobs when a queue becomes available.
  • Ease of use – Unlike most competitor offerings, Athena works out of the box. It’s very easily customized using the Athena GUI, or you can build your own roles, rules, database structures, and projections. The documentation for tuning AWS performance with Presto is very easy and straightforward, making it a small learning curve for any new user.
  • Data transformations – Athena’s robust Presto query language allows us to perform regex, quartile, and percentile statistics without resorting to an outside transformation step in Python or other languages. Going further, using window functions inside those same queries allows us to do some of the heavy mathematical lifting we would have needed to do outside of the backend process, thus saving cost and time. With Athena, these extra vital steps see no difference in cost or performance to our backend pipeline and allow us to condense complicated parts into one step.

Why we continue to grow with Athena

We continue to grow with Athena for the following reasons:

  • Future scale – Athena and its team keep improving and adding resources that support our ever-growing data needs, which have increased by 200% since Athena’s implementation. This has served as the bedrock to our backend solutions.
  • Improvements – The Sales and Engineering team at AWS has always been open to feedback and has turned that into better error reporting, work groups for Athena, changes in policy, and workload management through roles. This has allowed us to split Athena resources with workgroups from production-level work to running ad hoc jobs in future updates.
  • Cost is king – Every dollar we have saved through Athena has been put into products to make Athena better for us. Using Athena has allowed us to improve our front-end delivery products—from building our own workflows right into Athena, to taking time to work with the right compression for Athena ingestion, and even offloading more work that would have gone to a traditional ETL box. Cost for us is not just dollars but the time it takes to manage; that time saved is allowing us to be on the bleeding edge in development of new tools to deliver the data Athena helps us serve.

The following sections detail how EMX uses Athena to build, manage, and orchestrate its backend ELT work with minimal coding and maintenance.

Solution architecture

The following diagram shows the architecture EMX uses.

The following diagram shows the architecture EMX uses.

How we use Athena

Our custom scripts stream batched data each minute from auction servers directly to raw S3 buckets. The data is dropped in a .gzip format to datetime-partitioned S3 buckets. This partition structure helps us limit our Athena query scan. For example, the partitioned buckets look like the following screenshot.

For example, the partitioned buckets look like the following screenshot.

When the data has reached these partitioned buckets, EMX uses Apache Airflow to schedule various jobs across Athena. The following screenshot shows our DAG for our most-used pipeline.

The following screenshot shows our DAG for our most-used pipeline.

Before beginning to run Athena queries, we run two checks on our data in Amazon S3:

  • Check if all the expected data has arrived and is in the bucket.
  • Check logic match rules and clean illegal fields in the data.

On the success of both tasks, we start adding the latest partition to an Athena table.

When the partition is added to the table, we start running the query. The query status is polled every 10 seconds to get the latest status on the query performance until completion. The query returns the status as success, failed, or canceled. Depending on what query status is returned, further tasks are then forked.

At times, we have noticed queries fail with the error Query resource exhausted at this scale, which usually goes away on triggered retries. For the same reason, we have a retry mechanism in place on the execute_athena_sql task. If the retry fails, it alerts the team and data is copied over to a debug bucket for further investigation. If it succeeds, it moves ahead with further transformation.

For further transformation, we get the output of the Athena query back in Amazon S3, and then we add the business rules to enrich the data in Amazon S3.

Based on the pipeline logic, the data is then copied from Amazon S3 to different data stores, one of them being Amazon Redshift.

The last step is to clean up the metadata that was generated by the Athena query.

The following is an example from one of our pipelines. This Athena table is projected on top of the partitioned buckets, and the table is also partitioned by datetime so that the table can be read off the data directly when it’s ready. The following screenshot is what the sample table campaigns_stream looks like, which reads the data from the aforementioned bucket.

The following screenshot is what the sample table campaigns_stream looks like, which reads the data from the aforementioned bucket.

As soon as our scheduled jobs are triggered, the job runs data checks, data matches, and complex SQL queries on this table using the latest date partition, which was loaded in the last DAG task, which limits the data scan and reduces costs. The results are generated and pushed to the S3 reporting bucket to be picked up by other processes. The results can be generated in different formats like CSV, Apache Avro, and Apache Parquet using the CTAS or INSERT INTO command.

For example, running the following simple count query for each domain scans approximately 1.65 TB of data and gives back the results in less than 600 seconds, without needing us to set up or manage any infrastructure.

For example, running the following simple count query for each domain scans approximately.

When the query is complete and the output files in the S3 reporting bucket are ready, they’re picked up by our DAG and pushed into data storage like Amazon Redshift.

Optimization on Athena

By default, Athena has a soft limit of 20 DML active queries (CTAS). When we have multiple jobs running in parallel, we may hit that limit, delaying our time-sensitive pipelines and jobs. To overcome this, we allocated a fixed time window in each hour for our most critical pipelines, and other jobs with lower priority are run later.

For example, our production pipelines get priority 1 – with window minute 0 to minute 15 of every hour. We’re aware that we can request a limit increase from AWS, but we instead decided to use this opportunity to improve the resilience and robustness of our system.

Conclusion

“Build, don’t buy” has been EMX’s motto. It drives our innovation forward, much like Athena continues to be able to solve all the questions we ask of it. We build boutique and large-scale solutions for our advertising clients, which require a malleable and robust ETL backend that takes the work and cost to a manageable level. We built an ever-scaling, cost-effective, and highly available ETL backend with Athena.

Our successes with Athena are shown through both time and cost savings, including:

  • 30% of the time used on maintenance of a traditional ETL structure is now moved into Athena improvement, which sees improved feedback in reduced costs that we can pass on to our clients
  • Four times less cost per query than competitors has allowed us to put money into different tools for storage and modeling, giving even more entropy to driving more revenue for our clients and less cost
  • 10 times less technical debt in Athena setup, research, staging, and production, which goes back into other future-thinking projects

What we can do with data is only limited by the time we need in herding, cleaning, and delivering this data for insights and development. Since throwing 100% of our ETL backend systems into Athena, we have increased product delivery and systems optimization four-fold in only a quick short year. Athena and the Athena team continue to grow with us even as our data needs begin to soar exponentially, adding more tools that reduce workflows, management, and job distribution in the AWS ecosystem itself. This entropy between EMX and Athena has resulted in increased cooperation and more business with us and our growing lists of clients.

Our “Why” is building the tools for the future, and Athena personifies our “Why” in delivering what EMX is about: scale, on time, and delivery of data optimized for the modern era.


About the Authors

Gary Bouton is VP of Data Engineering at ENGINE Media Exchange and leads their Data Engineering and Data Science Product teams. Pipeline implementation is led by Director of Data Pipeline Rahul Gupta, Senior Engineer Nader S. Gharawi, Data Science Engineer Raghav Gupta. Data model implementation is led by Senior Data Scientist Gabrielle Agrocostea , and Data Scientist Heena Otia.

 

Louis Ashner is EVP of Technology at ENGINE Media Exchange. He has a passion for making the Internet faster, and is an ad-tech pioneer with more than 10 years of experience working with digital advertising, including real-time bidding and programmatic advertising. His 9 patents in networking optimization and data caching are used to power EMX’s proprietary ad exchange.

Detecting anomalous values by invoking the Amazon Athena machine learning inference function

Post Syndicated from Amir Basirat original https://aws.amazon.com/blogs/big-data/detecting-anomalous-values-by-invoking-the-amazon-athena-machine-learning-inference-function/

Amazon Athena has released a new feature that allows you to easily invoke machine learning (ML) models for inference directly from your SQL queries. Inference is the stage in which a trained model is used to infer and predict the testing samples and comprises a similar forward pass as training to predict the values. Unlike training, it doesn’t include a backward pass to compute the error and update weights. It’s usually the production phase where you deploy your model to predict real-world data. Using ML models in SQL queries makes complex tasks such as anomaly detection, customer cohort analysis, and sales predictions as simple as invoking a function in a SQL query.

In this post, we show you how to use Athena ML to run a federated query that uses Amazon SageMaker inference to detect an anomalous value in your result set.

Solution overview

To use ML with Athena (Preview), you define an ML with Athena function with the USING FUNCTION clause. The function points to the Amazon SageMaker model endpoint that you want to use and specifies the variable names and data types to pass to the model. Subsequent clauses in the query reference the function to pass values to the model. The model runs inference based on the values that the query passes and returns inference results.

You can use more than a dozen built-in ML algorithms provided by Amazon SageMaker, train your own models, or find and subscribe to model packages from AWS Marketplace and deploy on Amazon SageMaker hosting services. No additional setup is required. You can invoke these ML models in your SQL queries from the Athena console, Athena APIs, and through the Athena JDBC driver.

To detect anomalous values, we use the Random Cut Forest (RCF) algorithm, which is an unsupervised algorithm for detecting anomalous data points within a dataset.

Prerequisites

This post continues the work done in this blog. You need to follow steps in that post to run the AWS CloudFormation template before proceeding with this post. No additional setup is required.

As part of the CloudFormation stack that you run to build the environment, we create a new AWS Identity and Access Management (IAM) role that Amazon SageMaker uses to run an Athena query to generate our training dataset, train a new model, and deploy that model to an Amazon SageMaker endpoint. To perform these tasks, our IAM role should have AmazonSageMakerFullAccess, AmazonAthenaFullAccess, and AmazonS3FullAccess managed policies. In a production setting, you should scope down the AmazonS3FullAccess policy to include only the Amazon Simple Storage Service (Amazon S3) buckets that you require for training your model.

Additionally, we create a new Amazon SageMaker notebook instance using an ml.m4.xlarge instance type. We use the ARN of the IAM role for Amazon SageMaker as the IAM role that this notebook uses when interacting with other AWS services.

Uploading and launching the Jupyter notebook

To upload and launch your Jupyter notebook, complete the following steps:

  1. On the Amazon SageMaker console, choose Notebook Instances.

You can see a workshop notebook instance of size ml.m4.xlarge, which you created when you deployed the CloudFormation stack.

  1. Select the instance and choose Open Jupyter.
  2. Download the Jupyter notebook file that we provide as part of this post.
  3. Upload the file to Jupyter.
  4. Choose the file and open the Python code so you can go through it step by step.

Running the Python code

You now run the Jupyter notebook Python code on the console, starting from the first cell.

Make sure to update the S3 bucket defined in the second cell of the notebook by replacing the bucket name with your S3 athena-federation-workshop-******** bucket, which you created when deploying the CloudFormation template. This bucket name in your account is globally unique, and we use this bucket to store our training data and model.

In the third cell, we call a federated query against the orders table on the Aurora MySQL database using the lambda:mysql connector that we defined and used in the previous post. This query generates a training dataset for number of orders per day.

After running the fourth cell and waiting for a few seconds, you should see the training dataset.

When you build, train, and deploy your ML model on Amazon SageMaker, you normally have a model training phase and a deployment phase. At the end of your deployment, Amazon SageMaker provides you with an endpoint that your client application can interact with to input data and get the inference response back. This endpoint is what we use in our SQL query to call the ML function for inference.

In the fifth cell, we train an RCF model to detect anomalies and we deploy the model to an Amazon SageMaker endpoint that our application or Athena query can call. This part can take up to 10 minutes before the training job is complete, after which you get a generated Amazon SageMaker endpoint. Record this endpoint name; we need this in our Athena federated query.

Running an Athena ML query

On the Athena console, check your workgroup and make sure that you’re switched to the AmazonAthenaPreviewFunctionality workgroup. This workgroup enables Athena ML capabilities for your query while this functionality is in preview.

Run the saved query DetectAnamolyInOrdersData after replacing the endpoint name with the one that you generated from your Amazon SageMaker notebook run.

Amazon SageMaker RCF is an unsupervised algorithm for detecting anomalous data points within a dataset. These are observations that are distinguishable from well-structured or patterned data. In the preceding results, the RCF algorithm associates each data point an anomaly score. Low score values indicate that the data point is considered normal. High values indicate the presence of an anomaly in the data. The definitions of low and high depend on the application, but common practice suggests that scores beyond three standard deviations from the mean score are considered anomalous.

Cleaning up

When you finish experimenting with the features as part of this post, remember to clean up all the AWS resources that you created using AWS CloudFormation and during the setup.

  1. On the Amazon S3 console, empty the S3 bucket the CloudFormation template created. AWS CloudFormation can only delete the bucket if it’s empty.
  2. On the AWS CloudFormation console, delete all the connectors so they’re no longer attached to the elastic network interface (ENI) of the VPC. Alternatively, you can go to each connector and deselect the VPC so it’s no longer attached to the VPC that AWS CloudFormation created.
  3. On the Amazon SageMaker console, delete any endpoints you created as part of this post.
  4. On the Athena console, delete the AmazonAthenaPreviewFunctionality workgroup.

Conclusion

In this post, you learned about Athena support for invoking ML inference model for detecting anomalous values using the RCF algorithm that was developed on Amazon SageMaker. We demonstrated how to deploy your ML model one time on Amazon SageMaker to enable anyone in your organization to run your models any number of times for inference. Additionally, if you run Athena federated queries with this feature, then you can run inference on data in any data source.


About the Authors

Amir Basirat is a Big Data specialist solutions architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a Big Data specialist for different technology companies. He also has a PhD in computer science, where his research was focused on large-scale distributed computing and neural networks.

 

Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Orchestrating an AWS Glue DataBrew job and Amazon Athena query with AWS Step Functions

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/orchestrating-an-aws-glue-databrew-job-and-amazon-athena-query-with-aws-step-functions/

As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Also, as we start building complex data engineering or data analytics pipelines, we look for a simpler orchestration mechanism with graphical user interface-based ETL (extract, transform, load) tools.

Recently, AWS announced the general availability of AWS Glue DataBrew, a new visual data preparation tool that helps you clean and normalize data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

Regarding orchestration or workflow management, AWS provides AWS Step Functions, a serverless function orchestrator that makes it easy to build a workflow by integrating different AWS services like AWS Lambda, Amazon Simple Notification Service (Amazon SNS), AWS Glue, and more. With its built-in operational controls, Step Functions manages sequencing, error handling, retry logic, and states, removing a significant operational burden from your team.

Today, we’re launching Step Functions support for DataBrew, which means you can now invoke DataBrew jobs in your Step Functions workflow to build an end-to-end ETL pipeline. Recently, Step Functions also started supporting Amazon Athena integration, which means that you can submit SQL queries to the Athena engine through a Step Functions state.

In this post, we walk through a solution where we integrate a DataBrew job for data preparation, invoke a series of Athena queries for data refresh, and integrate Amazon QuickSight for business reporting. The whole solution is orchestrated through Step Functions and is invoked through Amazon EventBridge.

Use case overview

For our use case, we use two public datasets. The first dataset is a sales pipeline dataset, which contains a list of over 20,000 sales opportunity records for a fictitious business. Each record has fields that specify the following:

  • A date, potentially when an opportunity was identified
  • The salesperson’s name
  • A market segment to which the opportunity belongs
  • Forecasted monthly revenue

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this post, we assume that they’re related.

For our use case, these sales and marketing CSV files are maintained by your organization’s Marketing team, which uploads the updated full CSV file to Amazon Simple Storage Service (Amazon S3) every month. The aggregated output data is created through a series of data preparation steps, and the business team uses the output data to create business intelligence (BI) reports.

Architecture overview

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration, DataBrew for data preparation, Athena for data analysis with standard SQL, and QuickSight for business reporting. In addition, we use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

We use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

The workflow includes the following steps:

  • Step 1 – The Marketing team uploads the full CSV file to an S3 input bucket every month.
  • Step 2 – An EventBridge rule, scheduled to run every month, triggers the Step Functions state machine.
  • Steps 3 and 4 – We receive two separate datasets (sales data and marketing data), so Step Functions triggers two parallel DataBrew jobs, which create additional year, month, and day columns from the existing date field and uses those three columns for partitioning. The jobs write the final output to our S3 output bucket.
  • Steps 5, 6, 7, 8 – After the output data is written, we can create external tables on top of it with Athena create table statements and then load partitions with MCSK REPAIR commands. After the AWS Glue Data Catalog tables are created for sales and marketing, we run an additional query through Athena, which merges these two tables by year and month to create another table with aggregated output.
  • Steps 9 and 10 – As the last step of the Step Functions workflow, we send a notification to end-users through Amazon SNS to notify them that the data refresh job completed successfully.
  • Steps 11, 12, 13 – After the aggregated table data is refreshed, business users can use QuickSight for BI reporting, which fetches data through Athena. Data analysts can also use Athena to analyze the complete refreshed dataset.

Prerequisites

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

Additionally, create the S3 input and output buckets with the required subfolders to capture the sales and marketing data, and upload the input data into their respective folders.

Creating a DataBrew project

To create a DataBrew project for the marketing data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Project name, enter a name (for this post, marketing-data-etl).
  4. For Select a dataset, select New dataset.

For Select a dataset, select New dataset.

  1. For Enter your source from S3, enter the S3 path of the marketing input CSV.

For Enter your source from S3, enter the S3 path of the marketing input CSV.

  1. Under Permissions, for Role name, choose an AWS Identity and Access Management (IAM) role that allows DataBrew to read from your Amazon S3 input location.

You can choose a role if you already created one, or create a new one. Please read here for steps to create the IAM role.

  1. After the dataset is loaded, on the Functions menu, choose Date functions.
  2. Choose YEAR.

Choose YEAR.

  1. Apply the year function on the date column to create a new column called year.

  1. Repeat these steps to create month and day columns.

Repeat these steps to create month and day columns.

For our use case, we created a few new columns that we plan to use for partitioning, but you can integrate additional transformations as needed.

  1. After you have finished applying all your transformations, choose Publish on the recipe.
  2. Provide a description of the recipe version and choose Publish.

Creating a DataBrew job

Now that our recipe is ready, we can create a job for it, which gets invoked through our Step Functions state machine.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create a job.
  3. For Job name¸ enter a name (for example, marketing-data-etl).

Your recipe is already linked to the job.

  1. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose PARQUET).
  2. For S3 location, enter your final S3 output bucket path.
  3. For Compression, choose the compression type you want to apply (for this post, we choose Snappy).
  4. Under Additional configurations, for Custom partition by column values, choose year, month, and day.
  5. For File output storage, select Replace output files for each job run.

We choose this option because our use case is to do a full refresh.

We choose this option because our use case is to do a full refresh.

  1. Under Permissions, for Role name¸ choose your IAM role.
  2. Choose Create job.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

  1. When your marketing job is ready, repeat the same steps for your sales data, using the sales data output file location as needed.

Creating a Step Functions state machine

We’re now ready to create a Step Functions state machine for the complete flow.

  1. On the Step Functions console, choose Create state machine.
  2. For Define state machine¸ select Author with code snippets.
  3. For Type, choose Standard.

For Type, choose Standard.

In the Definition section, Step Functions provides a list of service actions that you can use to automatically generate a code snippet for your state machine’s state. The following screenshot shows that we have options for Athena and DataBrew, among others.

  1. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

4. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

  1. For Job name, choose Select job name from a list and choose your DataBrew job.

The JSON snippet appears in the Preview pane.

  1. Select Wait for DataBrew job runs to complete.
  2. Choose Copy to clipboard.

Choose Copy to clipboard.

  1. Integrate the code into the final state machine JSON code:
    {
       "Comment":"Monthly Refresh of Sales Marketing Data",
       "StartAt":"Refresh Sales Marketing Data",
       "States":{
          "Refresh Sales Marketing Data":{
             "Type":"Parallel",
             "Branches":[
                {
                   "StartAt":"Sales DataBrew ETL Job",
                   "States":{
                      "Sales DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"sales-data"
                         },
                         "Next":"Drop Old Sales Table"
                      },
                      "Drop Old Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Sales Table"
                      },
                      "Create Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `sales_data_output`(`date` string, `salesperson` string, `lead_name` string, `segment` string, `region` string, `target_close` string, `forecasted_monthly_revenue` int,   `opportunity_stage` string, `weighted_revenue` int, `closed_opportunity` boolean, `active_opportunity` boolean, `latest_status_entry` boolean) PARTITIONED BY (`year` string,`month` string, `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/sales/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Sales Table Partitions"
                      },
                      "Load Sales Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                },
                {
                   "StartAt":"Marketing DataBrew ETL Job",
                   "States":{
                      "Marketing DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"marketing-data-etl"
                         },
                         "Next":"Drop Old Marketing Table"
                      },
                      "Drop Old Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Marketing Table"
                      },
                      "Create Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `marketing_data_output`(`date` string, `new_visitors_seo` int, `new_visitors_cpc` int, `new_visitors_social_media` int, `return_visitors` int, `twitter_mentions` int,   `twitter_follower_adds` int, `twitter_followers_cumulative` int, `mailing_list_adds_` int,   `mailing_list_cumulative` int, `website_pageviews` int, `website_visits` int, `website_unique_visits` int,   `mobile_uniques` int, `tablet_uniques` int, `desktop_uniques` int, `free_sign_up` int, `paid_conversion` int, `events` string) PARTITIONED BY (`year` string, `month` string, `day` string) ROW FORMAT SERDE   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/marketing/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Marketing Table Partitions"
                      },
                      "Load Marketing Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                }
             ],
             "Next":"Drop Old Summerized Table"
          },
          "Drop Old Summerized Table":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"DROP TABLE default.sales_marketing_revenue",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Create Summerized Output"
          },
          "Create Summerized Output":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Notify Users"
          },
          "Notify Users":{
             "Type":"Task",
             "Resource":"arn:aws:states:::sns:publish",
             "Parameters":{
                "Message":{
                   "Input":"Monthly sales marketing data refreshed successfully!"
                },
                "TopicArn":"arn:aws:sns:us-east-1:<account-id>:<sns-topic-name>"
             },
             "End":true
          }
       }
    }

The following diagram is the visual representation of the state machine flow. With the Step Functions parallel task type, we created two parallel job runs for the sales and marketing data. When both flows are complete, they join to create an aggregated table in Athena and send an SNS notification to the end-users.

The following diagram is the visual representation of the state machine flow.

Creating an EventBridge scheduling rule

Now let’s integrate EventBridge to schedule the invocation of our Step Functions state machine on the first day of every month.

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose Create a rule.
  3. For Name, enter a name (for example, trigger-step-funcion-rule).
  4. Under Define pattern, select Schedule.
  5. Select Cron expression.
  6. Enter 001** to specify that the job runs on the first day of every month at midnight.

  1. In the Select targets section, for Target, choose Step Functions state machine
  2. For State machine, choose your state machine.

For State machine, choose your state machine.

Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.
Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.

When the job is complete, all the steps should be green.

When the job is complete, all the steps should be green.

You also receive the notification “Monthly sales marketing data refreshed successfully!”

Running an Athena query

Let’s validate the aggregated table output in Athena by running a simple SELECT query. The following screenshot shows the output.

Let’s validate the aggregated table output in Athena by running a simple SELECT query.

Creating reports in QuickSight

Now let’s do our final step of the architecture, which is creating BI reports through QuickSight by connecting to the Athena aggregated table.

  1. On the QuickSight console, choose Athena as your data source.

On the QuickSight console, choose Athena as your data source.

  1. Select the database and table name you have in Athena.

Select the database and table name you have in Athena.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

 

If QuickSight is using SPICE storage, you need to refresh the dataset in QuickSight after you receive notification about the completion of data refresh. If the QuickSight report is running an Athena query for every request, you might see a “table not found” error when data refresh is in progress. We recommend leveraging SPICE storage to get better performance.

Conclusion

This post explains how to integrate a DataBrew job and Athena queries with Step Functions to implement a simple ETL pipeline that refreshes aggregated sales and marketing data for BI reporting.

I hope this gives you a great starting point for using this solution with your datasets and applying business rules to build a complete serverless data analytics pipeline.


About the Author

Sakti Mishra

Sakti Mishra is a Senior Data Lab Solution Architect at AWS, where he helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places.

Accessing and visualizing data from multiple data sources with Amazon Athena and Amazon QuickSight

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-data-from-multiple-data-sources-with-amazon-athena-and-amazon-quicksight/

Amazon Athena now supports federated query, a feature that allows you to query data in sources other than Amazon Simple Storage Service (Amazon S3). You can use federated queries in Athena to query the data in place or build pipelines that extract data from multiple data sources and store them in Amazon S3. With Athena Federated Query, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources. Athena queries including federated queries can be run from the Athena console, a JDBC or ODBC connection, the Athena API, the Athena CLI, the AWS SDK, or AWS Tools for Windows PowerShell.

The goal for this post is to discuss how we can use different connectors to run federated queries with complex joins across different data sources with Athena and visualize the data with Amazon QuickSight.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of the Athena query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon Redshift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source in the cloud or on premises that is accessible from Lambda.

Prerequisites

Before creating your development environment, you must have the following prerequisites:

Configuring your data source connectors

After you deploy your CloudFormation stack, follow the instructions in the post Extracting and joining data from multiple data sources with Athena Federated Query to configure various Athena data source connectors for HBase on Amazon EMR, DynamoDB, ElastiCache for Redis, and Amazon Aurora MySQL.

You can run Athena federated queries in the AmazonAthenaPreviewFunctionality workgroup created as part of the CloudFormation stack or you could run them in the primary workgroup or other workgroups as long as you’re running with Athena engine version 2. As of this writing, Athena Federated Query is generally available in the Asia Pacific (Mumbai), Asia Pacific (Tokyo), Europe (Ireland), US East (N. Virginia), US East (Ohio), US West (N. California), and US West (Oregon) Regions. If you’re running in other Regions, use the AmazonAthenaPreviewFunctionality workgroup.

For information about changing your workgroup to Athena engine version 2, see Changing Athena Engine Versions.

Configuring QuickSight

The next step is to configure QuickSight to use these connectors to query data and visualize with QuickSight.

  1. On the AWS Management Console, navigate to QuickSight.
  2. If you’re not signed up for QuickSight, you’re prompted with the option to sign up. Follow the steps to sign up to use QuickSight.
  3. After you log in to QuickSight, choose Manage QuickSight under your account.

After you log in to QuickSight, choose Manage QuickSight under your account.

  1. In the navigation pane, choose Security & permissions.
  2. Under QuickSight access to AWS services, choose Add or remove.

Under QuickSight access to AWS services, choose Add or remove.

A page appears for enabling QuickSight access to AWS services.

  1. Choose Athena.

Choose Athena.

  1. In the pop-up window, choose Next.

In the pop-up window, choose Next.

  1. On the S3 tab, select the necessary S3 buckets. For this post, I select the athena-federation-workshop-<account_id> bucket and another one that stores my Athena query results.
  2. For each bucket, also select Write permission for Athena Workgroup.

For each bucket, also select Write permission for Athena Workgroup.

  1. On the Lambda tab, select the Lambda functions corresponding to the Athena federated connectors that Athena federated queries use. If you followed the post Extracting and joining data from multiple data sources with Athena Federated Query when configuring your Athena federated connectors, you can select dynamo, hbase, mysql, and redis.

For information about registering a data source in Athena, see the appendix in this post.

  1. Choose Finish.

Choose Finish.

  1. Choose Update.
  2. On the QuickSight console, choose New analysis.
  3. Choose New dataset.
  4. For Datasets, choose Athena.
  5. For Data source name, enter Athena-federation.
  6. For Athena workgroup, choose primary.
  7. Choose Create data source. 

As stated earlier, you can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

You can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

  1. For Catalog, choose the catalog that you created for your Athena federated connector.

For information about creating and registering a data source in Athena, see the appendix in this post.

For information about creating and registering a data source in Athena, see the appendix in this post.

  1. For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

I can now see the database and tables listed in QuickSight.

  1. Choose Edit/Preview data to see the data.
  2. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

22. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

  1. To do a join with another Athena data source, choose Add data and select the catalog and table.
  2. Choose the join link between the two datasets and choose the appropriate join configuration.
  3. Choose Apply.

Choose Apply

You should be able to see the joined data.

You should be able to see the joined data.

Running a query in QuickSight

Now we use the custom SQL option in QuickSight to run a complex query with multiple Athena federated data sources.

  1. On the QuickSight console, choose New analysis.
  2. Choose New dataset.
  3. For Datasets, choose Athena.
  4. For Data source name, enter Athena-federation.
  5. For the workgroup, choose primary.
  6. Choose Create data source.
  7. Choose Use custom SQL.
  8. Enter the query for ProfitBySupplierNation.
  9. Choose Edit/Preview data.

Choose Edit/Preview data.

Under Query mode, you have the option to view your query in either SPICE or direct query. SPICE is the QuickSight Super-fast, Parallel, In-memory Calculation Engine. It’s engineered to rapidly perform advanced calculations and serve data. Using SPICE can save time and money because your analytical queries process faster, you don’t need to wait for a direct query to process, and you can reuse data stored in SPICE multiple times without incurring additional costs. You also can refresh data in SPICE on a recurring basis as needed or on demand. For more information about refresh options, see Refreshing Data.

With direct query, QuickSight doesn’t use SPICE data and sends the query every time to Athena to get the data.

  1. Select SPICE.
  2. Choose Apply.
  3. Choose Save & visualize.

Choose Save & visualize.

  1. On the Visualize page, under Fields list, choose nation and sum_profit.

QuickSight automatically chooses the best visualization type based on the selected fields. You can change the visual type based on your requirement. The following screenshot shows a pie chart for Sum_profit grouped by Nation.

The following screenshot shows a pie chart for Sum_profit grouped by Nation.

You can add more datasets using Athena federated queries and create dashboards. The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

When your analysis is ready, you can choose Share to create a dashboard and share it within your organization.

Summary

QuickSight is a powerful visualization tool, and with Athena federated queries, you can run analysis and build dashboards on various data sources like DynamoDB, HBase on Amazon EMR, and many more. You can also easily join relational, non-relational, and custom object stores in Athena queries and use them with QuickSight to create visualizations and dashboards.

For more information about Athena Federated Query, see Using Amazon Athena Federated Query and Query any data source with Amazon Athena’s new federated query.


Appendix

To register a data source in Athena, complete the following steps:

  1. On the Athena console, choose Data sources.

On the Athena console, choose Data sources.

  1. Choose Connect data source.

Choose Connect data source.

  1. Select Query a data source.
  2. For Choose a data source, select a data source (for this post, I select Redis).
  3. Choose Next.

Choose Next.

  1. For Lambda function, choose your function.

For this post, I use the redis Lambda function, which I configured as part of configuring the Athena federated connector in the post Extracting and joining data from multiple data sources with Athena Federated Query.

  1. For Catalog name, enter a name (for example, redis).

The catalog name you specify here is the one that is displayed in QuickSight when selecting Lambda functions for access.

  1. Choose Connect.

Choose Connect.

When the data source is registered, it’s available in the Data source drop-down list on the Athena console.

When the data source is registered, it’s available in the Data source drop-down list on the Athena console.


About the Author

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

 

 

 

Dream11’s journey to building their Data Highway on AWS

Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

Since inception, Dream11 has been a data-driven sports technology brand. 

Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

  • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
  • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
  • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
  • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
  • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

Design goals

Dream11 had the following design goals for the project:

  • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
  • The cost should be low and should be pay-as-you-go.
  • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
  • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
  • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
  • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
  • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
  • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
  • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
  • The system should be able to build 360-degree user profiles
  • The system should provide alerting on important changes to key business metrics.
  • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

Data Highway architecture

In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

The following diagram shows the high-level architecture.

For this project, they used the following components:

The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

Event ingestion, segregation, and organization

Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

The following diagram shows the logical structure of these events.

When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

  • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
  • Apache Druid – Provides near real-time dimensional analytics
  • Amazon Redshift – Provides session analytics

The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

  The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

 

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

Storage, cataloging, ETL, and scheduling

In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

Updating the AWS Glue Data Catalog with metadata for the target table

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

ETL with Amazon EMR Presto

Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

Apache Airflow for schedule management

Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

Real-time and near-real-time analytics

In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

Concurrency analytics with Apache Druid

Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

Finding active users with Amazon EMR HBase

Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

Session analytics with Amazon Redshift

Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

Event analytics with Athena

Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

  Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 

Conclusion

This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


About the Authors

Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

 

Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

Boosting your data lake insights using the Amazon Athena Query Federation SDK

Post Syndicated from Adir Sharabi original https://aws.amazon.com/blogs/big-data/boosting-your-data-lake-insights-using-the-amazon-athena-query-federation-sdk/

Today’s modern applications use multiple purpose-built database engines, including relational, key-value, document, and in-memory databases. This purpose-built approach improves the way applications use data by providing better performance and reducing cost. However, the approach raises some challenges for data teams that need to provide a holistic view on top of these database engines, and especially when they need to merge the data with datasets in the organization’s data lake.

In this post, we show how to use the Amazon Athena Query Federation SDK to easily enrich your data in Amazon Simple Storage Service (Amazon S3) with data from external datastores, apply complex transformations, and get predictive insights by inferencing machine learning (ML) models.

We start by running a query to enrich an S3 backed table that holds features extracted from breast cancer images with fictional patient personal information stored in Amazon DynamoDB. We then use the Athena UDF functionality to decrypt sensitive information stored in the table. Next, we select these features and use Athena integration with Amazon SageMaker to pass them to a linear learner model to predict whether breast cancer is present. Lastly, we show an Amazon QuickSight dashboard to visualize the results.

For this post, we use the following resources:

  • A dataset of features computed from a digitized image of a fine needle aspirate of a breast mass. Such features include radius, texture, perimeter, area, and smoothness. The dataset is stored as a CSV file in Amazon S3.
  • Patient’s personal information (such as age range, email, and country) stored in DynamoDB.
  • A linear learner model deployed into a SageMaker endpoint to predict breast cancer. For more information, see Call an Amazon SageMaker model endpoint using Amazon API Gateway and AWS Lambda.

Prerequisites

Athena Query Federation is now generally available in US East (Ohio), US East (N. Virginia), and US West (Oregon). To use this feature, upgrade your engine version to Athena engine version 2 in your workgroup settings. To enable this feature in other Regions, you need to create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup. Workgroups allows us to:

  • Isolate users, teams, applications, or workloads into groups
  • Enforce costs constraints per query or workgroup
  • Track query-related metrics for all workgroup queries in Amazon CloudWatch

For more information, see Managing Workgroups.

Creating a DynamoDB table and SageMaker endpoint

For the post, we create a new DynamoDB table with synthetic patient data. For the inference requests, we create a new SageMaker endpoint.

  1. Deploy the following AWS CloudFormation stack in us-east-1:

The stack performs the following:

  • Creates a DynamoDB table
  • Creates and triggers an AWS Lambda function to load the table with data
  • Creates a SageMaker endpoint for inference requests

It can take up to 10 minutes for the CloudFormation stack to create the resources.

  1. After the resource creation is complete, navigate to the AWS CloudFormation console.
  2. Choose the boost-your-datalake-insights stack
  3. On the Outputs tab, copy the values for DynamoTableName and SMEndpointName. You use these values later in the post.

Downloading the dataset

Under new or existing bucket create a folder named breast_cancer_features. Download the breast_cancer_raw_data.csv file and upload it to breast_cancer_features folder in your bucket. This file contains the Breast Cancer Wisconsin (Diagnostic) Data Set, available at https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Diagnostic%29.

The file holds 570 records with 30 columns of values computed for each cell nucleus. Such features include radius, texture, perimeter, area, smoothness, compactness, concavity, concave points, symmetry, and fractal dimension. The following screenshot displays a few records from the file.

Creating the features table in Athena

To query this dataset from Athena, you need to create an external table that points to that file. There are several ways to create table in Athena. For this post, we use a Hive data definition language (DDL) statement.

  1. On the Athena console, if using a non-GA Region, switch to the AmazonAthenaPreviewFunctionality workgroup created earlier.
  2. Enter the following statement:
    CREATE EXTERNAL TABLE breast_cancer_features(
    id string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://<bucket>/breast_cancer_features/'
    TBLPROPERTIES (
      'classification'='csv', 
      'columnsOrdered'='true', 
      'compressionType'='none', 
      'delimiter'=',', 
      'skip.header.line.count'='1')

  1. Choose Run Query or press CTRL + Enter.

After the table is created successfully, we can run queries such as the following, to profile your data and better understand how it’s distributed:

SELECT variance(radius_mean) AS variance,
        stddev(radius_mean) AS stddev ,
        min(radius_mean) AS min,
         max(radius_mean) AS max,
         avg(radius_mean) AS avg
FROM "default"."breast_cancer_features"

The following screenshot shows our results.

Joining the features table with data stored in DynamoDB

In this step, we enrich the features table by joining it with the personal information stored in DynamoDB. With Athena Query Federation, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources.

The following screenshot displays a few records from the DynamoDB table.

To join the features table stored in Amazon S3 with the personal data stored in DynamoDB, we need to create a data source connector that runs on Lambda to run the federated query. A data source connector is a piece of code that can translate between your target data source and Athena. You can think of a connector as an extension of the query engine in Athena.

Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, DynamoDB, Amazon DocumentDB (with MongoDB capability), and Amazon Relational Database Service (Amazon RDS), and JDBC-compliant relational data sources such as MySQL and PostgreSQL. You can also use the Athena Query Federation SDK to write custom connectors.

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source.

Deploying a Lambda function data source connector

To deploy the data source connector, complete the following steps

  1. On the Athena console, choose Data sources.
  2. Choose Connect data source.
  3. Choose Query a data source.
  4. For Choose a data source, choose the data source that you want to query with Athena, such as Amazon DynamoDB.
  5. Choose Next.
  6. For Lambda function, choose Configure new AWS Lambda function.

The function page for the connector that you chose opens on the Lambda console. The page includes detailed information about the connector.

  1. Under Application settings, specify the required information. At a minimum, this includes:
    • AthenaCatalogName – A name for the Lambda function that indicates the data source that it targets, such as athena_dynamodb_connector.
    • SpillBucket – An S3 bucket in your account to store data that exceeds Lambda function response size limits.

For more information about the remaining configurable options, see Available Connectors on GitHub.

  1. Select I acknowledge that this app creates custom IAM roles.
  2. Choose Deploy.

The Resources section on the Lambda console shows the deployment status of the connector and informs you when the deployment is complete.

Connecting to a data source

After you deploy the data source connector to your account, you can connect it to a data source from Athena.

  1. On the Athena console, choose Connect data source.
  2. Choose Query a data source.
  3. Choose the data source for the connector that you just deployed, such as Amazon DynamoDB. If you used the Athena Query Federation SDK to create your own connector and have deployed it to your account, choose All other data sources.
  4. Choose Next.
  5. For Choose Lambda function, choose the function that you named previously (athena_dynamodb_connector).
  6. For Catalog name, enter a unique name to use for the data source in your SQL queries, such as dynamo_db.

The name can be up to 127 characters and must be unique within your account. It can’t be changed after creation. Valid characters are a–z, A–Z, 0–9, _, @, and -. The names awsdatacatalog, hive, jmx, and system are reserved by Athena and can’t be used for custom catalog names.

  1. Choose Connect.

The Data sources page shows your connector in the list of catalog names. You can now use the connector in your queries.

Querying the external data source

To query your external data source, complete the following steps:

  1. In the Athena Query editor, for Data source, choose dynamo_db.

The DynamoDB table appears in the Tables list.

  1. Choose (three dots) and choose Preview table.

Now we can run queries like the following to enrich and get more insights on our data by joining it with additional data that was stored in DynamoDB:

SELECT age, count(*)
FROM breast_cancer_features d
JOIN "dynamo_db"."default"."<DynamoTableName>" p
ON d.id = p.patient_id
GROUP BY age

The following screenshot shows our results.

Using custom UDFs to decrypt the patient’s email

When looking at our patient’s personal information stored in the DynamoDB table, we can see that the patient’s email is encrypted. We want to allow our users to get the decrypted email while querying with Athena without needing to run custom ETL, which requires us to store it decrypted.

User Defined Functions (UDFs) in Athena allow you to create custom functions to process records or groups of records. A UDF accepts parameters, performs the work, and returns a result. However, UDFs in Athena have the following limitations:

  • Scalar UDFs only – Athena only supports scalar UDFs, which process one row at a time and return a single column value. Athena passes a batch of rows to the UDF each time it invokes a Lambda function.
  • Java runtime only – As of this writing, Athena UDFs support only the Java 8 runtime for Lambda.

To use this feature in preview, you must create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup, as specified in prerequisites section. For more information, see Querying with User Defined Functions.

Deploying a custom UDF

In this post, we use the decrypt method from the example UDF we published. The same encryption key that we used for the encryption should also be used for the decryption. We store that string in AWS Secrets Manager and use the Athena Query Federation SDKs to retrieve the stored key when the function is called.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Select secret type, select Other type of secrets.
  3. Choose Plaintext.
  4. Remove all the JSON brackets and enter a base64 encoded string as data key, such as AQIDBAUGBwgJAAECAwQFBg==
  5. For Select the encryption key, choose DefaultEncryptionKey.
  6. Choose Next.

  1. Enter athena_encrypt_udf_key as the secret name.
  2. Choose Next.
  3. Chose Next again.
  4. Chose Store.

Next, we deploy the Lambda function that runs the UDF.

  1. On the AWS Serverless Application Repository console, in the navigation pane, choose Available applications.
  2. Select Show apps that create custom IAM roles or resource policies.
  3. In the search box, enter AthenaUserDefinedFunctions.
  4. Choose the application from the result pane.

The Lambda function’s Application details page opens on the Lambda console.

  1. For SecretNameOrPrefix, enter the name or prefix of a set of names within Secrets Manager that this function should have access to, such as athena_encrypt_udf_key* (make sure to include an asterisk at the end).
  2. For LambdaFunctionName, enter the name of the Lambda function that runs your UDFs, such as athena_udf.
  3. Select I acknowledge that this app creates custom IAM roles.
  4. Choose Deploy.

The Resources section of the Lambda console shows the deployment status of the connector and informs you when the deployment is complete.

Querying with UDFs

The USING FUNCTION clause specifies a UDF or multiple UDFs that can be referenced by a subsequent SELECT statement in the query. You need the method name for the UDF and the name of the Lambda function that hosts the UDF.

In our example, the decrypt method gets two parameters: encrypted_col and secretName. We define the function and run the following query:

USING FUNCTION decrypt(encrypted_col VARCHAR, secretName VARCHAR)
RETURNS VARCHAR
TYPE LAMBDA_INVOKE WITH (lambda_name ='athena_udf')

SELECT encrypted_email, 
    decrypt(encrypted_email,'athena_encrypt_udf_key') AS decrypted_email
FROM "dynamo_db"."default"."<DynamoTableName>"

The following screenshot shows the query results.

Using Athena ML to predict breast cancer

To get predictive insights from our data using ML models, we need to write a code to enable inference against a deployed model. Data analysts are often skilled in using SQL, but may not have expertise in programming. ML with Athena bridges this gap and lets you run inference on models deployed on SageMaker by writing SQL statements in Athena. This feature simplifies access for data analysis to ML models such as anomaly detection, customer cohort analysis, and sales predictions, and improves productivity. This eliminates the need to use complex programming methods or offload data and jobs orchestration to run inference. 

SageMaker is a fully managed ML service, where data scientists and developers can quickly and easily build and train ML models. In addition, SageMaker allows you to deploy your model to get predictions in one of two ways:

Running a SQL statement with SageMaker inference to predict breast cancer

To perform inference from Athena, you need to train the model based on historical labeled data and deploy it into the SageMaker hosting service. In this post, we use a linear learner model to predict breast cancer out of features we used before. The model was already deployed as part of the CloudFormation stack as you can see in the screenshot below from the SageMaker console.

To use ML with Athena, you define a function with the USING FUNCTION clause. The function points to the SageMaker model endpoint that you want to use and specifies the variable names and data types to pass to the model. Subsequent clauses in the query reference the function to pass values to the model. The model runs inference based on the values that the query passes and then returns inference results.

In our example, the model endpoint is SMEndpoint-odPIqmf9LjUh and the variables are the feature columns from the breast_cancer_features table. We define the function and run the following query:

USING FUNCTION predict_breast_cancer(radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) 
RETURNS DOUBLE 
TYPE SAGEMAKER_INVOKE_ENDPOINT WITH (sagemaker_endpoint = '<SMEndpointName>')

SELECT id, prediction, CASE WHEN round(prediction)=1 THEN 'B' ELSE 'M' END AS diagnosis
FROM( SELECT id, predict_breast_cancer(radius_mean, texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave_points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave_points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave_points_worst, symmetry_worst, fractal_dimension_worst) AS prediction FROM breast_cancer_features)a

In the following query results, you can see the prediction column returned by the model. The diagnosis column that derives from it is either B for benign or M for malignant.

Visualizing the data using QuickSight

Because many decision-makers aren’t familiar with SQL syntax, they want to consume the data in more graphic way, such as through charts and dashboards. Visualizing the data is also required by data scientists to identify trends and anomalies, and understand how our data behaves.

Before we visualize our data with QuickSight, in order to get the best performances, we create a new dataset in Amazon S3 that holds all the patient’s personal information joined with the breast cancer diagnostic we got from the ML model inference. We use Athena’s CREATE TABLE AS SELECT (CTAS) statement to create the table and populate it with the joint data:

CREATE TABLE breast_cancer_final
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://<bucket>/breast_cancer_final/')
AS
USING FUNCTION predict_breast_cancer(radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) 
RETURNS DOUBLE 
TYPE SAGEMAKER_INVOKE_ENDPOINT WITH (sagemaker_endpoint = '<SMEndpointName>')

SELECT *, CASE WHEN round(prediction)=1 THEN 'B' ELSE 'M' END AS diagnosis
FROM( SELECT id, p.*, predict_breast_cancer(radius_mean, texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave_points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave_points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave_points_worst, symmetry_worst, fractal_dimension_worst) AS prediction FROM breast_cancer_features d JOIN "dynamo_db"."default"."<DynamoTableName>" p
ON d.id = p.patient_id)a

When the query successfully finishes, we can start building our dashboards using QuickSight.

First, we create a dataset in QuickSight for our table in Athena. For instructions, see Creating a Dataset Using Amazon Athena Data.

Next, we create QuickSight visuals.

The following dashboard shows the distribution of age range using a pie visual, a patient count by diagnostic per week, the top five countries, and patient distribution over time with forecast enabled.

Clean up

Now to the final step, cleaning up the resources.

To avoid unnecessary charges on your AWS account, do the following:

  1. Destroy all of the resources created by the CloudFormation stack in create a DynamoDB table and SageMaker endpoint set up by deleting the stack after you’re done experimenting with it. You can follow the steps here to delete the stack.
  2. You have to manually delete the S3 bucket you created with the data uploaded and generated with Athena.

Conclusions

This post demonstrated how the Athena Query Federation SDK allows you to implement serverless ETL to get more out of your data in your Amazon S3 data lake. We showed how simple Athena SQL syntax allows you to enrich your data with an external datastore, perform business logic using custom UDFs, and get insights by running ML inference. We also visualized our enriched dataset by creating a dashboard in QuickSight.

If you have feedback about this post, please share it in the comments. If you have questions about implementing the solution used in this post, comment or open a thread on the Developer Tools forum.


About the Authors

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

Eitan Sela is a Solutions Architect with Amazon Web Services. He works with AWS customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS. Eitan also helps customers build and operate machine learning solutions on AWS. In his spare time, Eitan enjoys jogging and reading the latest machine learning articles.

Keeping your data lake clean and compliant with Amazon Athena

Post Syndicated from David Roberts original https://aws.amazon.com/blogs/big-data/keeping-your-data-lake-clean-and-compliant-with-amazon-athena/

With the introduction of CTAS support for Amazon Athena (see Use CTAS statements with Amazon Athena to reduce cost and improve performance), you can not only query but also create tables using Athena with the associated data objects stored in Amazon Simple Storage Service (Amazon S3). These tables are often temporary in nature and used to filter or aggregate data that already exists in another Athena table. Although this offers great flexibility to perform exploratory analytics, when tables are dropped, the underlying Amazon S3 data remains indefinitely. Over time, the accumulation of these objects can increase Amazon S3 costs, become administratively challenging to manage, and may inadvertently preserve data that should have been deleted for privacy or compliance reasons. Furthermore, the AWS Glue table entry is purged so there is no convenient way to trace back which Amazon S3 path was mapped to a deleted table.

This post shows how you can automate  deleting Amazon S3 objects associated with a table  after dropping it using Athena. AWS Glue is required to be the metadata store for Athena.

Overview of solution

The solution requires that the AWS Glue table record (database, table, Amazon S3 path) history is preserved outside of AWS Glue, because it’s removed immediately  after a table is dropped. Without this record, you can’t delete the associated Amazon S3 object entries after the fact.

When Athena CTAS statements  are issued, AWS Glue generates Amazon CloudWatch events that specify the database and table names. These events are available from Amazon EventBridge and can be used to trigger an AWS Lambda function (autoCleanS3) to fetch the new or updated Amazon S3 path from AWS Glue and write the database, table, and Amazon S3 path into an AWS Glue history table stored in Amazon DynamoDB (GlueHistoryDDB). When Athena drop table queries are detected, CloudWatch events are generated that trigger autoCleanS3 to look up the Amazon S3 path from GlueHistoryDDB and delete all related objects from Amazon S3.

Not all dropped tables should trigger Amazon S3 object deletion. For example, when you create a table using existing Amazon S3 data (not CTAS), it’s not advisable to automatically delete the associated Amazon S3 tables, because other analysts may have other tables referring to the same source data. For this reason, you must include a user-defined comment (--dropstore ) in the Athena drop table query to cause autoCleanS3 to purge the Amazon S3 objects.

Lastly, after objects are successfully deleted, the corresponding entry in GlueHistoryDDB  is updated for historical and audit purposes. The overall workflow is described in the following diagram.

The workflow contains the following steps:

  1. A user creates a table either via Athena or the AWS Glue console or API.
  2. AWS Glue generates a CloudWatch event, and an EventBridge rule triggers the Lambda function.
  3. The function creates an entry in DynamoDB containing a copy of the AWS Glue record and Amazon S3 path.
  4. The user drops the table from Athena, including the special comment --dropstore.
  5. The Lambda function fetches the dropped table entry from DynamoDB, including the Amazon S3 path.
  6. The function deletes data from the Amazon S3 path, including manifest files, and marks the DynamoDB entry as purged.

Walkthrough overview

To implement this solution, we complete the following steps:

  1. Create the required AWS Identity and Access Management (IAM) policy and role.
  2. Create the AWS Glue history DynamoDB table.
  3. Create the Lambda autoCleanS3 function.
  4. Create the EventBridge rules.
  5. Test the solution.

If you prefer to use a preconfigured CloudFormation template, launch one of the following stacks depending on your Region.

Region Launch Button
us-east-1 (N. Virginia)
us-west-2 (Oregon)
eu-west-1 (Ireland)

Prerequisites

Before implementing this solution, create an AWS Glue database and table with the data residing in  Amazon S3. Be sure your user has the necessary permissions to access Athena and  perform CTAS operations writing  in a sample Amazon S3 location.

For more information about building a data lake, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

Creating an IAM policy and role

You need to first create the required IAM policy for the Lambda function role to use to query AWS Glue and write to DynamoDB.

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the following code (update the Region, account ID, and S3 bucket accordingly, and the table name GlueHistoryDDB if you choose to change it):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListObjectsV2",
                    "s3:DeleteObjectVersion",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<athena-query-results-s3-bucket>",
                    "arn:aws:s3:::<athena-query-results-s3-bucket>/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "dynamodb:PutItem",
                    "dynamodb:Scan",
                    "dynamodb:Query",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:<region>:<accountId>:table/GlueHistoryDDB"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<region>:<accountId>:catalog",
                    "arn:aws:glue:<region>:<accountId>:database/*",
                    "arn:aws:glue:<region>:<accountId>:table/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogGroup"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:log-group:/aws/lambda/autoCleanS3:*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  1. Choose Review policy.
  2. For Name, enter autoCleanS3-LambdaPolicy.
  3. For Description, enter Policy used by Lambda role to purge S3 objects when an Amazon Athena table is dropped.
  4. Choose Create policy.

Next, you need to create an IAM role and attach this policy.

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. Choose AWS service.
  4. Choose Lambda.
  5. Choose Next: Permissions.

  1. For Filter policies, enter autoCleanS3-LambdaPolicy.
  2. Choose Next: Tags.
  3. Choose Next: Review.
  4. For Role name, enter autoCleanS3-LambdaRole.
  5. For Description, enter Role used by Lambda to purge S3 objects when an Amazon Athena table is dropped.
  6. Choose Create role.

Creating the AWS Glue history DynamoDB table

You use this DynamoDB table to hold the current and historical list of AWS Glue tables and their corresponding Amazon S3 path. Create the table as follows:

  1. On the DynamoDB console, choose Dashboard.
  2. Choose Create table.
  3. For Table name, enter GlueHistoryDDB.
  4. For Partition key, enter database (leave type as String).
  5. Select Add sort key.
  6. Enter table_date (leave type as String).
  7. For Table settings, select Use default settings.
  8. Choose Create.

The following table summarizes the GlueHistoryDDB table attributes that the Lambda function creates.

Column Type Description
database partition key The name of the AWS Glue database.
table_date sort key A composite attribute of AWS Glue table name plus date created. Because the same database and table name can be created again, the date must be used to ensure uniqueness.
created_by attribute The user or Amazon EC2 instance ARN from which the table was created.
owner attribute The owner of the table or account number.
purged attribute A boolean indicating whether the Amazon S3 objects have been deleted (True/False).
s3_path attribute The Amazon S3 path containing objects associated with the table.
table attribute The AWS Glue table name.
update_time attribute The last time the table was updated (the Amazon S3 path changed or objects purged).
view_sql attribute The view DDL if a view was created.

Creating the Lambda function autoCleanS3

A CloudWatch event triggers the Lambda function autoCleanS3 when a new table is created, updated, or dropped. If the --dropstore keyword is included in the Athena query comments, the associated Amazon S3 objects are also removed.

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch.
  3. For Function name¸ enter autoCleanS3.
  4. For Runtime, choose Python 3.8.
  5. Under Permissions, for Execution role, select Use an existing role.
  6. Choose the role you created (service-role/autoCleanS3-LambdaRole).
  7. Choose Create function.
  8. Scroll down to the Function code section.
  9. If using Region us-west-2, on the Actions menu, choose Upload a file to Amazon S3.

  1. Enter the following:
    https://aws-bigdata-blog.s3.amazonaws.com/artifacts/aws-blog-keep-your-data-lake-clean-and-compliant-with-amazon-athena/autoCleanS3.zip

  2. Choose Save.

If using a Region other than us-west-2, download the Lambda .zip file locally. Then choose Upload a .zip file and choose the file from your computer to upload the Lambda function.

  1. In the Environment variables section, choose Edit.
  2. Choose Add environment variable.
  3. Enter the following key-values in the following table (customize as desired):
Key Value Purpose
Athena_SQL_Drop_Phrase --dropstore String to embed in Athena drop table queries to cause associated Amazon S3 objects to be removed
db_list

Comma-separated regex filter

<.*>

Allows you to limit which databases may contain tables that autoCleanS3 is allowed to purge
ddb_history_table GlueHistoryDDB The name of the AWS Glue history DynamoDB table
disable_s3_cleanup False If set to True, it disables the Amazon S3 purge, still recording attempts in the history table
log_level INFO Set to DEBUG to troubleshoot if needed

You must use a standard regex expression, which can be a simple comma-separated list of the AWS Glue databases that you want autoCleanS3 to evaluate.

 The following table shows example patterns for db_list.

Example Regex Pattern Result
.* Default, includes all databases
clickstream_web, orders_web, default Includes only clickstream_web, orders_web, default
.*_web Includes all databases having names ending in _web
.*stream.* Includes all databases containing stream in their name

For a complete list or supported patterns, see https://docs.python.org/3/library/re.html#re.Pattern.match

  1. Choose Save.

Creating EventBridge rules

You need to create EventBridge rules that invoke your Lambda function whenever Athena query events and AWS Glue CreateTable and UpdateTable events are generated.

Creating the Athena event rule

To create the Athena query event rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-AthenaQueryEvent.
  3. For Description, enter Amazon Athena event for any query to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"AWS API Call via CloudTrail"
    	],
    	"source": [
    		"aws.athena"
    	],
    	"detail": {
    		"eventName": [
    			"StartQueryExecution"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

Creating the AWS Glue event rule

To create the AWS Glue table event  rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-GlueTableEvent.
  3. For Description, enter AWS Glue event for any table creation or update to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"Glue Data Catalog Database State Change"
    	],
    	"source": [
    		"aws.glue"
    	],
    	"detail": {
    		"typeOfChange": [
    			"CreateTable",
    			"UpdateTable"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

You’re finished!

Testing the solution

Make sure you already have a data lake with tables defined in your AWS Glue Data Catalog and permission to access Athena. For this post, we use NYC taxi ride data. For more information, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

  1. Create a new table using Athena CTAS.

Next, verify that the entry appears in the new GlueHistoryDDB table.

  1. On the DynamoDB console, open the GlueHistoryDDB table.
  2. Choose Items.
  3. Confirm the s3_path value for the table.

You can also view  the Amazon S3 table path and objects associated with the table.

  1. On the Amazon S3 console, navigate to the s3_path found in GlueHistoryDDB.
  2. Confirm the table and path containing the data folder and associated manifest and metadata objects.

  1. Drop the table using the keyword --dropstore.

  1. Check the Amazon S3 path to verify both the table folder and associated manifest and metadata files have been removed.

You can also see the purged attribute for the entry in GlueHistoryDDB is now set to True, and update_time has been updated, which you can use if you ever need to look back and understand when a purge event occurred.

Considerations

The Lambda timeout may need to be increased for very large tables, because the object deletion operations may not complete in time.

To prevent accidental data deletion, it’s recommended to carefully limit which databases may participate (Lambda environment variable db_list) and to enable versioning on the Athena bucket path and set up Amazon S3 lifecycle policies to eventually remove older versions. For more information, see Deleting object versions. 

Conclusion

In this post, we demonstrated how to automate the process of  deleting Amazon S3 objects associated with dropped AWS Glue tables. Deleting Amazon S3 objects that are no longer associated with an AWS Glue table reduces ongoing storage expense, management overhead, and unnecessary exposure of potentially private data no longer needed within the organization, allowing you to meet regulatory requirements.

This serverless solution monitors Athena and AWS Glue table creation and drop events via CloudWatch, and triggers Lambda to perform Amazon S3 object deletion. We use DynamoDB to store the audit history of all AWS Glue tables that have been dropped over time. It’s strongly recommended to enable Amazon S3 bucket versioning to prevent accidental data deletion.

To restore the Amazon S3 objects for the deleted table, you first identify the s3_path value for the relevant table entry in GlueHistoryDDB and either copy or remove the delete marker from objects in that path. For more information, see How do I undelete a deleted S3 object?


About the Author

David Roberts is a Senior Solutions Architect at AWS. His passion is building efficient and effective solutions on the cloud, especially involving analytics and data lake governance. Besides spending time with his wife and two daughters, he likes drumming and watching movies, and is an avid video gamer.

Auditing, inspecting, and visualizing Amazon Athena usage and cost

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/auditing-inspecting-and-visualizing-amazon-athena-usage-and-cost/

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon Simple Storage Service (Amazon S3) using standard SQL. It’s a serverless platform with no need to set up or manage infrastructure. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets and complex queries. You pay only for the queries that you run and the charges are based on the amount of data scanned by each query. You’re not charged for data definition language (DDL) statements like CREATE, ALTER, or DROP TABLE, statements for managing partitions, or failed queries. Cancelled queries are charged based on the amount of data scanned.

Typically, multiple users within an organization operate under different Athena workgroups and query various datasets in Amazon S3. In such cases, it’s beneficial to monitor Athena usage to ensure cost-effectiveness, avoid any performance bottlenecks, and adhere to security best practices. As a result, it’s desirable to have metrics that provide the following details:

  • Amount of data scanned by individual users
  • Amount of data scanned by different workgroups
  • Repetitive queries run by individual users
  • Slow-running queries
  • Most expensive queries

In this post, we provide a solution that collects detailed statistics of every query run in Athena and stores it in your data lake for auditing. We also demonstrate how to visualize audit data collected for a few key metrics (data scanned per workgroup and data scanned per user) using Amazon QuickSight.

Solution overview

The following diagram illustrates our solution architecture.

The solution consists of the following high-level steps:

  1. We use an Amazon CloudWatch Events rule with the following event pattern to trigger an AWS Lambda function for every Athena query run:
    {
    "detail-type": [
    "Athena Query State Change"
    ],
    "source": [
    "aws.athena"
    ],
    "detail": {
    "currentState": [
    "SUCCEEDED",
    "FAILED",
    "CANCELED"
    ]
    }
    }

  1. The Lambda function queries the Athena API to get details about the query and publishes them to Amazon Kinesis Data Firehose.
  2. Kinesis Data Firehose batches the data and writes it to Amazon S3.
  3. We use a second CloudWatch event rule with the following event pattern to trigger another Lambda function for every Athena query being run:
    {
    "detail-type": [
    "AWS API Call via CloudTrail"
    ],
    "source": [
    "aws.athena"
    ],
    "detail": {
    "eventName": [
    "StartQueryExecution"
    ]
    }
    }

  1. The Lambda function extracts AWS Identity and Access Management (IAM) user details from the query and publishes them to Kinesis Data Firehose.
  2. Kinesis Data Firehose batches the data and writes it to Amazon S3.
  3. We create and schedule an AWS Glue crawler to crawl the data written by Kinesis Data Firehose in the previous steps to create and update the Data Catalog tables. The following code is the table schemas the crawler creates:
    CREATE EXTERNAL TABLE `athena_query_details`(
      `query_execution_id` string, 
      `query` string, 
      `workgroup` string, 
      `state` string, 
      `data_scanned_bytes` bigint, 
      `execution_time_millis` int, 
      `planning_time_millis` int, 
      `queryqueue_time_millis` int, 
      `service_processing_time_millis` int)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://<S3location>/athena-query-details/'
    TBLPROPERTIES (
      'parquet.compression'='SNAPPY')
    
    CREATE EXTERNAL TABLE `athena_user_access_details`(
      `query_execution_id` string, 
      `account` string, 
      `region` string, 
      `user_detail` string, 
      `source_ip` string, 
      `event_time` string)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://<S3location>/athena-user-access-details/'
    TBLPROPERTIES (
      'parquet.compression'='SNAPPY')

  1. Athena stores a detailed history of the queries run in the last 30 days.
  2. The solution deploys a Lambda function that queries AWS CloudTrail to get list of Athena queries run in the last 30 days and publishes the details to the Kinesis Data Firehose streams created in the previous steps. The historical event processor function only needs to run one time after deploying the solution using the provided AWS CloudFormation
  3. We use the data available in the tables athena_query_details and athena_user_access_details in QuickSight to build insights and create visual dashboards.

Setting up your resources

Click on the Launch Stack button to deploy the solution in your account.

After you deploy the CloudFormation template, manually invoke the athena_historicalquery_events_processor Lambda function. This step processes the Athena queries that ran before deploying this solution; you only need to perform this step one time. 

Reporting using QuickSight dashboards

In this section, we cover the steps to create QuickSight dashboards based on the athena_query_details and athena_user_access_details Athena tables. The first step is to create a dataset with the athena_audit_db database, which the CloudFormation template created.

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset.

  1. For Create a Data Set, choose Athena.

 

  1. For Data source name, enter a name (for example, Athena_Audit).
  2. For Athena workgroup, keep at its default [primary].
  3. Choose Create data source.

  1. In the Choose your table section, choose athena_audit_db.
  2. Choose Use custom SQL.

  1. Enter a name for your custom SQL (for example, Custom-SQL-Athena-Audit).
  2. Enter the following SQL query:
    SELECT a.query_execution_id, a.query, a.state, a.data_scanned_bytes, a.execution_time_millis, b.user_detail
    FROM "athena_audit_db"."athena_query_details" AS a FULL JOIN  "athena_audit_db"."athena_user_access_details" AS b
    ON a.query_execution_id=b.query_execution_id
    WHERE CAST(a.data_scanned_bytes AS DECIMAL) > 0

  1. Choose Confirm query.

This query does a full join between the athena_query_details and athena_user_access_details tables.

  1. Select Import to SPICE for quicker analytics.
  2. Choose Edit/Preview data.

  1. Confirm that the data_scanned_bytes and execution_time_millis column data type is set to Decimal.
  2. Choose Save & visualize.

We’re now ready to create visuals in QuickSight. For this post, we create the following visuals:

  • Data scanned per query
  • Data scanned per user 

Data scanned per query

To configure the chart for our first visual, complete the following steps:

  1. For Visual types, choose Horizontal bar chart.
  2. For Y axis¸ choose query_execution_id.
  3. For Value, chose data_scanned_bytes.
  4. For Group/Color, choose query.

If you’re interested in determining the total runtime of the queries, you can use execution_time_milis instead of data_scanned_bytes.

The following screenshot shows our visualization.

 

If you hover over one of the rows in the chart, the pop-out detail shows the Athena query execution ID, Athena query that ran, and the data scanned by the query in bytes.

Data scanned per user

To configure the chart for our second visual, complete the following steps:

  1. For Visual types, choose Horizontal bar chart.
  2. For Y axis, choose query_execution_id
  3. For Value, choose data_scanned_bytes.
  4. For Group/Color, choose user_details.

You can use execution_time_millis instead of data_scanned_bytes if you’re interested in determining the total runtime of the queries.

The following screenshot shows our visualization.

If you hover over one of the rows in the chart, the pop-out detail shows the Athena query execution ID, the IAM principal that ran the query, and the data scanned by the query in bytes. 

Conclusion

This solution queries CloudTrail for Athena query activity in the last 30 days and uses CloudWatch rules to capture statistics for future Athena queries. Furthermore, the solution uses the GetQueryExecution API to provide details about the amount of data scanned, which provides information about per-query costs and how many queries are run per user. This enables you to further understand how your organization is using Athena.

You can further improve upon this architecture by using a partitioning strategy. Instead of writing all query metrics as .csv files to one S3 bucket folder, you can partition the data using year, month, and day partition keys. This allows you to query data by year, month, or day. For more information about partitioning strategies, see Partitioning Data.

For more information about improving Athena performance, see Top 10 Performance Tuning Tips for Amazon Athena.


About the Authors

Kalyan Janaki is Senior Technical Account Manager with AWS. Kalyan enjoys working with customers and helping them migrate their workloads to the cloud. In his spare time, he tries to keep up with his 2-year-old.

 

 

 

Kapil Shardha is an AWS Solutions Architect and supports enterprise customers with their AWS adoption. He has background in infrastructure automation and DevOps.

 

 

 

Aravind Singirikonda is a Solutions Architect at Amazon Web Services. He works with AWS Enterprise customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS.

Managing COVID-19 exposure with crowd tracing

Post Syndicated from Aspire Ventures original https://aws.amazon.com/blogs/big-data/managing-covid-19-exposure-with-crowd-tracing/

This is a guest blog post by AWS partner Aspire Ventures

As we enter winter, with fewer options to be outdoors, our personal choices can impact our risk of contracting the COVID-19 virus even more. The New England Journal of Medicine publication showed real-world examples of the effectiveness of masks and social distancing in mitigating severity of COVID-19 infection and keeping people asymptomatic. CNN reported on a study that showed people who contracted COVID-19 were twice as likely to have visited a restaurant in the prior two weeks. What if we had actionable crowding, mask usage, and social distancing data that we could analyze to inform our daily decisions to keep us safe?

Aspire Ventures, an AWS partner, has developed the Clio GO pass system — a new venue-entry system that helps track COVID-19 exposure through kiosks and mobile phones in a completely privacy-preserving way. It uses a new technology called crowd tracing, which allows users to assess whether certain locations and venues meet their risk profile. Crowd tracing data is COVID-19 location-scouting data, which helps answer the question of how much risk may be associated with entering a particular crowd.

Today, Aspire Ventures is collaborating with AWS to open source anonymized crowd tracing data from the Clio GO pass system and make it available in the public AWS COVID-19 data lake. Aspire Ventures is a venture fund dedicated to fast-tracking precision medicine technologies and practices that leverage AI and IoT to deliver affordable, individualized solutions at a massive scale. The AWS COVID-19 data lake is a public repository of up-to-date and curated datasets on or related to COVID-19 to help experts track, contain, and neutralize the virus causing the illness. With the Clio GO pass system and the open-source crowd tracing dataset in the AWS COVID-19 data lake, we believe the global community can come together and develop techniques to better fight the COVID-19 pandemic.

The Clio GO app functions like an airline mobile boarding pass system. Prior to arrival, you check in via the app by answering a few questions and receive a mobile entry GO Pass in either QR-code or NFC ticket format. When you arrive at a venue, you validate your GO Pass by scanning it at a kiosk or smartphone. GO Pass is being used by thousands of venues who have tens of millions of annual visitors. These venues run the spectrum from schools and medical practices to office buildings and food manufacturing facilities. Further adoption of Aspire’s Clio GO app will generate more anonymized data that AWS will make available for advancing COVID-19 solutions.

Crowd tracing vs. contact tracing

As Dr. Fauci said, contact tracing is “not working.” As the primary technology used by public health authorities, it’s fraught with poor adoption, poor accuracy, high cost, and serious privacy concerns. To understand the issues with contact tracing, we analogize to a first-person video game to introduce the immunological concepts of viral dose, viral load, and undetectable asymptomatic carriers.

Imagine a video game scenario with attackers and shields to protect from attacks. Viral dose is analogous to incremental hits that weaken a player’s shield. Avoiding those hits prevents your shield from collapsing.

Viral load is analogous to how strongly any one attacker can hit a player’s shield. Certain infected individuals who are more progressed in their infection may hit you harder. Just as in the game, your shields may be destroyed by many weak hits from multiple attackers or one very strong hit from a single attacker.

Asymptomatic carriers are like players who, from a distance, look like they have no weapons. Undetectable asymptomatic carriers are like players whose weapons can’t even be detected when you search their belongings—the science indicates that infectious asymptomatic carriers may not be detectable with COVID-19 PCR swab tests. CDC blood test surveys show between 6 times to 25 times as many asymptomatic carriers are lurking out there for any single known case.

Clio GO uses crowd tracing and adaptive artificial intelligence (A2I) to progressively improve the estimates of each player’s shield strength, the intensity of hits you might encounter, and the likely hits from attackers whose weapons are completely undetectable.

In contrast, contact tracing requires that an attacker have a visible weapon, and if so, it assumes shields are obliterated immediately. However, if there is no visible weapon, the shields remain at 100%. In either case, contact tracing doesn’t decrease your shield level based on cumulative small hits (viral dose) or how intense the hits (viral load of others) are by taking into account the conditions at the time, such as mask usage, social distancing, and duration of contact.

The more people who have checked in to the same place, the lower each person’s shield is computed to be. Shield levels are further refined by reported mask usage and social distancing within the crowd. The venue never sees the person’s shield level. It sees a green or red check mark indicating if you’re entering with a valid pass and meets their entry requirements, but no symptom data is shared with the venue.

After your visit, you can rate the venue’s use of masks and social distancing, and this report helps compute your own shield level and that of others. When using the Clio GO app to scout venues prior to visiting, the venue’s listing shows the aggregate mask and social distancing ratings by visitors.

As part of our collaboration with AWS, and to broaden the adoption of crowd tracing, Clio GO app users can now pre-screen their visitors at no cost for personal, non-profit, faith- based, educational, and amateur athletic event use.

How you can contribute

We welcome everyone to participate in this collaborative effort. Using the app improves your and your visitor’s safety while contributing anonymized crowd tracing data to the open-source public AWS COVID-19 data lake.

In just a few minutes, you can get a free Clio GO account and use the Clio GO app to pre-screen people attending personal events and private clubs—whether small dinner parties, soccer matches, or religious gatherings. You can purchase additional hardware for unmanned door screening or mobile kiosk functionality, as well as solutions for commercial enterprises.

AWS COVID-19 data lake and crowd tracing data

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create a CloudFormation stack using the following template. This template creates a covid-19 database in your Glue Data Catalog and tables that point to the public AWS COVID-19 data lake. This includes a aspirevc_crowd_tracing table which points to up-to-date crowd tracing data, and also a aspirevc_crowd_tracing_zipcode_3digits table which points to a lookup which translates 3 digits zip codes used in the aspirevc_crowd_tracing table to the respective states.

You can query these tables using Amazon Athena. Athena is a serverless interactive query service that makes it easy to analyze the data in the AWS COVID-19 data lake. Athena supports SQL, a common language that data analysts use for analyzing structured data. To query the data, complete the following steps:

  1. Sign in to the Athena console.
    1. If this is the first time you are using Athena, you must specify a query result location on Amazon S3.
  2. From the drop-down menu, choose the covid-19 database.
  3. Enter your query.

The following query returns statistics including the number of people marked as symptoms, diagnosed, contact, and near for the given scan date:

SELECT
  cast(from_iso8601_timestamp(scandate) as date) as date,
  COUNT_IF(symptoms) as symptoms_count,
  COUNT_IF(diagnosed) as diagnosed_count,
  COUNT_IF(contact) as contact_count,
  COUNT_IF(near) as near_count, COUNT(*) as total_count
FROM "covid-19"."aspirevc_crowd_tracing"
WHERE from_iso8601_timestamp(scandate)
BETWEEN parse_datetime('2020-10-01:00:00:00','yyyy-MM-dd:HH:mm:ss')
AND parse_datetime('2020-10-16:00:00:00','yyyy-MM-dd:HH:mm:ss')
GROUP BY 1
ORDER BY 1

symptoms: Past 2 weeks, have you had any of the following symptoms: shortness of breath, fever, loss of taste or smell, new cough?

diagnosed: Past 2 weeks, have you been diagnosed with COVID or are waiting for COVID test results?

contact: Past 2 weeks, have you been in contact with anyone who has been diagnosed with COVID or is waiting for COVID test results?

near: Past 2 weeks, have you been near anyone with the following symptoms: shortness of breath, fever, loss of taste or smell, new cough?

The following screenshot shows the results of this query:

To see more details, you can run the following query to retrieve the same statistics per state per risklevel for the given scan date:

SELECT
  cast(from_iso8601_timestamp(scandate) as date) as date,
  SUBSTR(scannerdevice_zipcode, 1, 3) as zip,
  state,
  risklevel,
  result,
  COUNT_IF(symptoms) as symptoms_count,
  COUNT_IF(diagnosed) as diagnosed_count,
  COUNT_IF(contact) as contact_count,
  COUNT_IF(near) as near_count, 
  COUNT(*) as total_count
FROM "covid-19"."aspirevc_crowd_tracing"
JOIN "covid-19"."aspirevc_crowd_tracing_zipcode_3digits" ON SUBSTR(scannerdevice_zipcode, 1, 3) = aspirevc_crowd_tracing_zipcode_3digits.zip
WHERE from_iso8601_timestamp(scandate)
BETWEEN parse_datetime('2020-10-15:00:00:00','yyyy-MM-dd:HH:mm:ss')
AND parse_datetime('2020-10-16:00:00:00','yyyy-MM-dd:HH:mm:ss')
AND scannerdevice_zipcode<>''
GROUP BY 1,2,3,4,5
ORDER BY 1,3

The following screenshot shows the results of this query:

You can see that there are a small number of people marked as symptoms, diagnosed, contact, and near per state per risklevel.

By open-sourcing the data, we see possibilities to combine it with other AWS COVID-19 data lake datasets, such as hospitalizations or COVIDcast data. This can enable a new game feature such as a radar that predicts regional hotspot emergence.

If you’re a data analyst, we encourage you to contribute to building better crowd tracing algorithms using any of the data provided in the public AWS Covid-19 data lake. Even if you’re building a different solution, you can use this dataset without license fees. The following section can help you quickly get started.

The data

Although our public AWS COVID-19 data lake has excellent data sources, such as hospitalization data down to regional levels, Clio GO provides data even at the zip-code level. Below is a description of the schema of the data made available in the data lake:

Strong data privacy and cryptographic pseudonymity via Powch

In contrast to the significant privacy challenges associated with contact tracing, crowd tracing and the Clio GO system don’t require disclosure of contact identities to reduce your risk for COVID-19 infection. Returning to the video game analogy, learning the identity after the fact of who hit your shields doesn’t matter. However, knowing that that you’re entering a crowded map of strongly armed assailants might cause you to choose a different crowd. Therefore, the aggregate risk of the crowd becomes the only relevant concern for your risk of contracting COVID-19.

To protect your identity, the Clio GO app uses Powch, a powerful cryptographic technology that protects identity and data. Similar to Bitcoin, Powch enables pseudonymity—a way to log in without any linkage to your true identity. You don’t need to use any personally identifiable information for Clio Go registration. Instead, an unguessable and random secret ID is stored in a personal QR code, which only you have access to, and GO Pass has no knowledge about the owner of the secret ID. The secret ID is used during registration as the only form of identity in the Go Pass app.

After exposure to a possibly infected individual, contact tracing requires you to exactly identify all the individuals that you interacted with during the same period of time, and compromise their privacy as well as your own.

Although you may choose to be identified and share your name with the venue you visit, the Clio GO app never shows personal data, like actual temperature readings, to the venue owner. The app only tells the venue whether the GO Pass was accepted or denied based on the entry requirements of the venue.

Crowd-sourced solutions, open data, restrictions, and uses

We hope that free access to the crowd tracing data via the public AWS COVID-19 data lake encourages the development of new creative, low-cost COVID-19 mitigation solutions. You can use the data within commercial products under a creative commons license with the explicit requirement that algorithms developed from the public dataset are open and published.

We encourage using the crowd tracing data in the public AWS COVID-19 data lake in conjunction with other free data sources also in the lake. A commercial data feed with fewer restrictions and data limitations is being made available via AWS Data Exchange to commercial organizations who pass verification requirements.


About the authors and Aspire Ventures

Aspire Ventures has developed a novel artificial intelligence engine called A2i and joint ventures with mission-driven health systems. Aspire’s first joint venture with Penn Medical Lancaster General Health and Capital BlueCross established the Smart Health Innovation Lab, an entity that accelerates healthcare technologies that impact the quadruple aim. Aspire is partnering with Clalit, the majority health system of Israel, in a similar joint venture focused on Israeli start-ups to encourage innovation in healthcare.

Aspire Ventures was founded by Essam Abadir, SB MIT Mathematics, SB Sloan School of Management, and JD with Distinction from the University of Iowa School of Law. Essam founded Aspire Ventures as an impact investment AI firm in 2014 after selling an apps platform to Intel in 2013.

A2i is overseen by Victor Owuor, SB & MS Aeronautical and Astronautical Engineering MIT, SB & MS Electrical Engineering MIT, and JD Harvard School of Law. Prior to Aspire, Victor headed a significant cloud P&L at Oracle.

The Aspire Ventures CIO and Smart Health Innovation Lab CEO is Kim Ireland, MSIS Penn State University. Kim formerly managed health system EHR rollouts at Cerner and was CEO of startup MedStatix.

Scott Schell, PhD Immunology University of Chicago, MD University of Chicago, and MBA University of Michigan, heads Clio Health Go and is Chief Medical Officer for the Aspire portfolio. Scott was Founding Chair of Cleveland Clinic’s Department of Population Health and led development of two of healthcare’s largest platforms at Alere and UPMC.

Clio Health GO’s mission is to reinvent the healthcare experience via advanced telehealth, starting with COVID-19. GO Pass and the crowd tracing data and algorithm is a product of Medstatix. Powch provides patented cryptographic privacy and security technologies. Connexion Health provides the GO Pass kiosks. Clio Health GO, Medstatix, Connexion Health, and Powch are portfolio companies of Aspire.

Field Notes: Restricting Amazon WorkSpaces Users to Run Amazon Athena Queries

Post Syndicated from Somdeb Bhattacharjee original https://aws.amazon.com/blogs/architecture/field-notes-restricting-amazon-workspaces-users-to-run-amazon-athena-queries/

One of the use cases we hear from customers is that they want to provide very limited access to Amazon Workspaces users (for example contractors, consultants) in an AWS account. At the same time they want to allow them to query Amazon Simple Storage Service (Amazon S3) data in another account using Amazon Athena over a JDBC connection.

For example, marketing companies might provide private access to the first party data to media agencies through this mechanism.

The restrictions they want to put in place are:

  • For security reasons these Amazon WorkSpaces should not have internet connectivity. So the access to Amazon Athena must be over AWS PrivateLink.
  • Public access to Athena is not allowed using the credentials used for the JDBC connection. This is to prevent the users from leveraging the credentials to query the data from anywhere else.

In this post, we show how to use Amazon Virtual Private Cloud (Amazon VPC) endpoints for Athena, along with AWS Identity and Access Management (AWS IAM) policies. This provides private access to query the Amazon S3 data while preventing users from querying the data from outside their Amazon WorkSpaces or using the Athena public endpoint.

Let’s review the steps to achieve this:

  • Initial setup of two AWS accounts (AccountA and AccountB)
  • Set up Amazon S3 bucket with sample data in AccountA
  • Set up an IAM user with Amazon S3 and Athena access in AccountA
  • Create an Amazon VPC endpoint for Athena in AccountA
  • Set up Amazon WorkSpaces for a user in AccountB
  • Install a SQL client tool (we will use DbVisualizer Free) and Athena JDBC driver in Amazon WorkSpaces in AccountB
  • Use DbVisualizer to the query the Amazon S3 data in AccountA using the Athena public endpoint
  • Update IAM policy for user in AccountA to restrict private only access

 Prerequisites

To follow the steps in this post, you need two AWS Accounts. The Amazon VPC and subnet requirements are specified in the detailed steps.

Note: The AWS CloudFormation template used in this blog post is for US-EAST-1 (N. Virginia) Region so ensure the Region setting for both the accounts are set to US-EAST-1 (N. Virginia).

Walkthrough

The two AWS accounts are:

AccountA – Contains the Amazon S3 bucket where the data is stored. For AccountA you can create a new Amazon VPC or use the default Amazon VPC.

AccountB – Amazon WorkSpaces account. Use the following AWS CloudFormation template for AccountB:

  • The AWS CloudFormation template will create a new Amazon VPC in AccountB with CIDR 10.10.0.0/16 and set up one public subnet and two private subnets.
  • It will also create a NAT Gateway in the public subnet and create both public and private route tables.
  • Since we will be launching Amazon WorkSpaces in these private subnets and not all Availability Zones (AZ) are supported by Amazon WorkSpaces, it is important to choose the right AZ when creating them.

Review the documentation to learn which AWS Regions/AZ are supported.

We have provided two parameters in the AWS CloudFormation template:

  • AZName1
  • AZName2

Step 1

Before launching the CloudFormation stack:

  • Log in to AccountB
  • Search for AWS Resource Access Manager
  • On the right-hand side, you will notice the AZ ID to AZ Name mapping. Note down the AZ Name corresponding to AZ ID use1-az2 and use1-az4
  • Now launch the CloudFormation template and remember to choose the AZ names you noted down earlier
    • https://athena-workspaces-blogpost.s3.amazonaws.com/vpc.yaml
  • Enter the CloudFormation Stack Name as – ‘AthenaWorkspaces’ and leave everything default.
  • Once the CloudFormation stack creation is complete, create a peering connection from AccountB to AccountA.
  • Update the associated route tables for the private subnets with the new peering connection.

For information on how to create a VPC peering connection, refer to AWS documentation on VPC Peering.

AccountB VPC Route Table:

AccountB VPC Route Table:

AccountA VPC Route Table:

AccountA VPC Route Table

Step 2

  • Create a new Amazon S3 bucket in AccountA with a bucket name that starts with ‘athena-’.
  • Next, you can download a sample file and upload it to the Amazon S3 bucket you just created.
  • Use the following statements to create AWS Glue database. Use an external table for the data in the Amazon S3 bucket so that you can query it from Athena.
  • Go to Athena console and define a new database:

CREATE DATABASE IF NOT EXISTS sampledb

Once the database is created, create a new table in sampledb (by selecting sampledb from the “Database” drop down menu). Replace the <<your bucket name>> with the bucket you just created:

CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.amazon_reviews_tsv(
  marketplace string, 
  customer_id string, 
  review_id string, 
  product_id string, 
  product_parent string, 
  product_title string,
  product_category string,
  star_rating int, 
  helpful_votes int, 
  total_votes int, 
  vine string, 
  verified_purchase string, 
  review_headline string, 
  review_body string, 
  review_date string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  ESCAPED BY '\\'
  LINES TERMINATED BY '\n'
LOCATION
  's3://<<your bucket name>>/'
TBLPROPERTIES ("skip.header.line.count"="1")

 

Step 3

  • In AccountA, create a new IAM user with programmatic access.
  • Save the access key and secret access key.
  • For the same user add an Inline Policy which allows the following actions:

IAM summary

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowAthenaReadActions",
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups",
                "athena:ListDataCatalogs",
                "athena:GetExecutionEngine",
                "athena:GetExecutionEngines",
                "athena:GetNamespace",
                "athena:GetCatalogs",
                "athena:GetNamespaces",
                "athena:GetTables",
                "athena:GetTable"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowAthenaWorkgroupActions",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowGlueActionsViaVPCE",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowGlueActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowS3ActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::athena-*"
            ]
        }
    ]
}

 

Step 4

  • In this step, we create an Interface VPC endpoint (AWS PrivateLink) for Athena in AccountA. When you use an interface VPC endpoint, communication between your Amazon VPC and Athena is conducted entirely within the AWS network.
  • Each VPC endpoint is represented by one or more Elastic Network Interfaces (ENIs) with private IP addresses in your VPC subnets.
  • To create an Interface VPC endpoint follow the instructions and select Athena in the AWS Services list. Do not select the checkbox for Enable Private DNS Name.
  • Ensure the security group that is attached to the Amazon VPC endpoint is open to inbound traffic on port 443 and 444 for source AccountB VPC CIDR 10.10.0.0/16. Port 444 is used by Athena to stream query results.
  • Once you create the VPC endpoint, you will get a DNS endpoint name which is in the following format. We are going to use this in JDBC connection from the SQL client.

      VPC_Endpoint_ID.athena.Region.vpce.amazonaws.com

Step 5

  • In this step we set up Amazon WorkSpaces in AccountB.
  • Each Amazon WorkSpace is associated with the specific Amazon VPC and AWS Directory Service construct that you used to create it. All Directory Service constructs (Simple AD, AD Connector, and Microsoft AD) require two subnets to operate, each in different Availability Zones. This is why we created 2 private subnets at the beginning.
  • For this blog post I have used Simple AD as the directory service for the Amazon WorkSpaces.
  • By default, IAM users don’t have permissions for Amazon WorkSpaces resources and operations.
  • To allow IAM users to manage Amazon WorkSpaces resources, you must create an IAM policy that explicitly grants them permissions
  • Then attach the policy to the IAM users or groups that require those permissions.
  • To start, go to the Amazon WorkSpaces console and select Advanced Setup.
    • Set up a new directory using the SimpleAD option.
    • Use the “small” directory size and choose the Amazon VPC and private subnets you created in Step 1 for AccountB.
    • Once you create the directory, register the directory with Amazon WorkSpaces by selecting “Register” from the “Action” menu.
    • Select private subnets you created in Step 1 for AccountB.

Directory info

  • Next, launch Amazon WorkSpaces by following the Launch WorkSpaces button.
  • Select the directory you created and create a new user.
  • For the bundle, choose Standard with Windows 10 (PCoIP).
  • After the Amazon WorkSpaces is created, you can log in to the Amazon WorkSpaces using a client software. You can download it from https://clients.amazonworkspaces.com/
  • Login to your Amazon WorkSpace, install a SQL Client of your choice. At this point your Amazon WorkSpace still has Internet access via the NAT Gateway
  • I have used DbVisualizer (the free version) as the SQL client. Once you have that installed, install the JDBC driver for Athena following the instructions
  • Now you can set up the JDBC connections to Athena using the access key and secret key you set up for an IAM user in AccountA.

Step 6

To test out both the Athena public endpoint and the Athena VPC endpoint, create two connections using the same credentials.

For the Athena public endpoint, you need to use athena.us-east-1.amazonaws.com service endpoint. (jdbc:awsathena://athena.us-east-1.amazonaws.com:443;S3OutputLocation=s3://<athena-bucket-name>/)

Athena public

For the VPC Endpoint Connection, use the VPC Endpoint you created in Step 4 (jdbc:awsathena://vpce-<>.athena.us-east-1.vpce.amazonaws.com:443;S3OutputLocation=s3://<athena-bucket-name>/)

Database connection Athena

Now run a simple query to select records from the amazon_reviews_tsv table using both the connections.

SELECT * FROM sampledb.amazon_reviews_tsv limit 10

You should be able to see results using both the connections. Since the private subnets are still connected to the internet via the NAT Gateway, you can query using the Athena public endpoint.

Run the AWS Command Line Interface (AWS CLI) command using the credentials used for the JDBC connection from your workstation. You should be able to access the Amazon S3 bucket objects and the Athena query run list using the following commands.

aws s3 ls s3://athena-workspaces-blogpost

aws athena list-query-executions

Step 7

  • Now we lock down the access as described in the beginning of this blog post by taking the following actions:
  • Update the route table for the private subnets by removing the route for the internet so access to the Athena public endpoint is restricted from the Amazon WorkSpaces. The only access will be allowed through the Athena VPC Endpoint.
  • Add conditional checks to the IAM user access policy that will restrict access to the Amazon S3 buckets and Athena only if:
    • The request came in through the VPC endpoint. For this we use the “aws:SourceVpce” check and provide the VPC Endpoint ID value.
    • The request for Amazon S3 data is through Athena. For this we use the condition “aws:CalledVia” and provide a value of “athena.amazonaws.com”.
  • In the IAM access policy below replace <<your vpce id>> with your VPC endpoint id and update the previous inline policy which was added to the IAM user in Step 3.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowAthenaReadActions",
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups",
                "athena:ListDataCatalogs",
                "athena:GetExecutionEngine",
                "athena:GetExecutionEngines",
                "athena:GetNamespace",
                "athena:GetCatalogs",
                "athena:GetNamespaces",
                "athena:GetTables",
                "athena:GetTable"
            ],
            "Resource": "*",
            "Condition":{
               "StringEquals":{
                  "aws:SourceVpce":[
                     "<<your vpce id>>"
                  ]
               }
            }
        },
        {
            "Sid": "AllowAthenaWorkgroupActions",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup"
            ],
            "Resource": "*",
            "Condition":{
               "StringEquals":{
                  "aws:SourceVpce":[
                     "<<your vpce id>>"
                  ]
               }
            }
        },
        {
            "Sid": "AllowGlueActionsViaVPCE",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*",
            "Condition":{
               "StringEquals":{
                  "aws:SourceVpce":[
                     "<<your vpce id>>"
                  ]
               }
            }
        },
        {
            "Sid": "AllowGlueActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:CreateDatabase",
                "glue:GetTables",
                "glue:GetTable"
            ],
            "Resource": "*",
            "Condition":{
               "ForAnyValue:StringEquals":{
                  "aws:CalledVia":[
                     "athena.amazonaws.com"
                  ]
               }
            }
        },
        {
            "Sid": "AllowS3ActionsViaAthena",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::athena-*"
            ],
            "Condition":{
               "ForAnyValue:StringEquals":{
                  "aws:CalledVia":[
                     "athena.amazonaws.com"
                  ]
               }
            }
        }
    ]
}

Once you applied the changes, try to reconnect using both the Athena VPC endpoint as well Athena public endpoint connections. The Athena VPC endpoint connection should work but the public endpoint connection will time out. Also try the same Amazon S3 and Athena AWS CLI commands. You should get access denied for both the operations.

Clean Up

To avoid incurring costs, remember to delete the resources that you created.

For AWS AccountA:

  • Delete the S3 buckets
  • Delete the database you created in AWS Glue
  • Delete the Amazon VPC endpoint you created for Amazon Athena

For AccountB:

  • Delete the Amazon Workspace you created along with the Simple AD directory. You can review more information on how to delete your Workspaces.

Conclusion

In this blog post, I showed how to leverage Amazon VPC endpoints and IAM policies to privately connect to Amazon Athena from Amazon Workspaces that don’t have internet connectivity.

Give this solution a try and share your feedback in the comments!

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

Redacting sensitive information with user-defined functions in Amazon Athena

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/redacting-sensitive-information-with-user-defined-functions-in-amazon-athena/

Amazon Athena now supports user-defined functions (in Preview), a feature that enables you to write custom scalar functions and invoke them in SQL queries. Although Athena provides built-in functions, UDFs enable you to perform custom processing such as compressing and decompressing data, redacting sensitive data, or applying customized decryption. You can write your UDFs in Java using the Athena Query Federation SDK. When a UDF is used in a SQL query submitted to Athena, it’s invoked and run on AWS Lambda. You can use UDFs in both SELECT and FILTER clauses of a SQL query, and invoke multiple UDFs in the same query. Athena UDF functionality is available in Preview mode in the US East (N. Virginia) Region.

This blog post covers basic functionalities of Athena UDF, writing your own UDF, building and publishing your UDF to AWS Serverless Application Repository, configuring the UDF connector application, and using the UDF in Athena queries. You can implement use cases involving sensitive data, such as personal identifiable information (PII), credit cards, or Social Security numbers (SSN). In this post, we deploy a Redact UDF and use that to mask sensitive information.

Prerequisites

Before creating your development environment, you must have the following prerequisites:

To set up the development environment and address these prerequisites, deploy the CloudFormation template in the first part of this series, Extracting and joining data from multiple data sources with Athena Federated Query. The post provides instructions on building the required test environment and resources using the CloudFormation template.

Creating your IDE

After you deploy the CloudFormation template, you need to create the required AWS resources. To create the development environment to build and deploy the UDF, we use an AWS Cloud9 IDE. On the AWS Cloud9 console, locate your environment and choose Open IDE.

AWS Cloud9 Resize

The AWS Cloud9 IDE comes with a default 10 GB disk space, which can fill quickly when setting up the development environment, so you should resize it.

  1. Run the following command in the AWS Cloud9 IDE terminal to get the resize script:
    curl https://aws-data-analytics-workshops.s3.amazonaws.com/athena-  
         workshop/scripts/cloud9_resize.sh > cloud9_resize.sh

  1. Run the script by issuing the following command on the terminal to resize the disk to 20 GB:
    sh cloud9_resize.sh 20

  1. Check the free space on the disk with the following code:
    df -h

You should see something like the following screenshot.

Setting up the IDE

Next, you clone the SDK and prepare your IDE.

  1. Make sure that Git is installed on your system by entering the following code:
    sudo yum install git -y

  1. To install the Athena Query Federation SDK, enter the following command at the command line to clone the SDK repository. This repository includes the SDK, examples, and a suite of data source connectors.
    git clone https://github.com/awslabs/aws-athena-query-federation.git

If you’re working on a development machine that already has Apache Maven, the AWS CLI, and the AWS Serverless Application Model build tool installed, you can skip this step.

  1. From the root of the aws-athena-query-federation directory that you created when you cloned the repository, run the prepare_dev_env.sh script that prepares your development environment:
    cd aws-athena-query-federation	
    
    sudo chown ec2-user:ec2-user ~/.profile
    
    ./tools/prepare_dev_env.sh

This script requires manual inputs to run (choosing Enter as needed during the setup steps when prompted). You can edit this script to remove the manual inputs if you want to automate the setup entirely.

  1. Update your shell to source new variables created by the installation process or restart your terminal session:
    source ~/.profile

  1. Run the following code from the athena-federation-sdk directory within the GitHub project you checked out earlier:

Adding the UDF code and publishing the connector

In this section, you add your UDF function, build the JAR file, and deploy the connector.

  1. In the AWS Cloud9 IDE, expand the aws-athena-query-federation project and navigate to the AthenaUDFHandler.java file.
  2. Choose the file (double-click) to open it for editing.

Now we add the UDF code for a String Redact function, which redacts a string to show only the last four characters. You can use this UDF function to mask sensitive information.

  1. Enter the following code:
    /** Redact a string to show only the last 4 characters
         * 
         * 
         * 
         * @param input the string to redact
         * @return redacted string
         */
        public String redact(String input)
        {
            String redactedString = new StringBuilder(input).replace(0,     
                input.length()- 4, new String(new char[input.length() -   
                4]).replace("\0", "x")).toString(); 
            return redactedString;
        }

You can also copy the modified code with the following command (which must be run from the aws-athena-query-federation directory):

curl https://aws-data-analytics-workshops.s3.amazonaws.com/athena-workshop/scripts/AthenaUDFHandler.java > athena-udfs/src/main/java/com/amazonaws/athena/connectors/udfs/AthenaUDFHandler.java

After copying the file, you can open it in the AWS Cloud9 IDE to see its contents.

  1. To build the JAR file, save the file and run mvn clean install to build your project:
    cd ~/environment/aws-athena-query-federation/athena-udfs/
    
    mvn clean install

After it successfully builds, a JAR file is created in the target folder of your project named artifactId-version.jar, where artifactId is the name you provided in the Maven project, for example, athena-udfs.

  1. From the athena-udfs directory, run the following code to publish the connector to your private AWS Serverless Application Repository. The S3_BUCKET_NAME in the command is the Amazon Simple Storage Service (Amazon S3) location where a copy of the connector’s code is stored for the AWS Serverless Application Repository to retrieve it.
../tools/publish.sh S3_BUCKET_NAME athena-udfs

This allows users with relevant permission levels to deploy instances of the connector via a one-click form.

When the connector is published successfully, it looks like the following screenshot.

To see AthenaUserDefinedFunctions, choose the link shown in the terminal after the publish is successful or navigate to the AWS Serverless Application Repository by choosing Available Applications, Private applications.

Setting up the UDF connector

Now that the UDF connector code is published, we can install the UDF connector to use with Athena.

  1. Choose the AthenaUserDefinedFunctions application listed on the Private applications section in the AWS Serverless Application Repository.
  2. For Application name, leave it as the default name AthenaUserDefinedFunctions.
  3. For SecretNameorPrefix, enter a secret name if you have already saved it in AWS Secrets Manager; otherwise, enter database-*.
  4. For LambdaFunctionName, enter customudf.
  5. Leave the remaining fields as default.
  6. Select I acknowledge that this app creates custom IAM roles.
  7. Choose Deploy.

Querying with UDF in Athena

Now that the UDF connector code is deployed, we can run Athena queries that use the UDF.

If you ran the CloudFormation template from Part 1 of this blog series, the AmazonAthenaPreviewFunctionality workgroup was already created. If not, choose Create Workgroup on the Athena console and create a workgroup named AmazonAthenaPreviewFunctionality and set up your query result location in Amazon S3.

To proceed, make sure you are in the workgroup AmazonAthenaPreviewFunctionality. If not, choose the workgroup AmazonAthenaPreviewFunctionality and choose Switch workgroup.

You can now run a query to use the Redact UDF to mask sensitive information from PII columns. To show the comparison, we have included the PII column and masked data as part of the query results. If you ran the CloudFormation template from Part 1 of this series, you can navigate to the Saved Queries on the Athena console and choose RedactUdfCustomerAddress.

The following screenshot shows your query.

After the query runs, you should see results like the following screenshot. The redact_name, redact_phone, and redact_address columns only show the last four characters.

Cleaning up

To clean up the resources created as part of your CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty and delete the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
     aws s3 rm s3://athena-federation-workshop-<account-id> --recursive

  1. Use the AWS CloudFormation console or AWS CLI to delete the stacks Athena-Federation-Workshop and serverlessrepo-AthenaUserDefinedFunctions

Summary

In this post, you learned about Athena user-defined functions, how to create your own UDF, and how to deploy it to a private AWS Serverless Application Repository. You also learned how to configure the UDF and use it in your Athena queries. In the next post of this series, we discuss and demonstrate how to use a machine learning (ML) anomaly detection model developed on Amazon SageMaker and use that model in Athena queries to invoke an ML inference function to detect anomaly values in our orders dataset.


About the Authors


Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 


Amir Basirat is a Big Data specialist solutions architect at Amazon Web Services,
focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a Big Data specialist for different technology companies. He also has a PhD in computer science, where his research was primarily focused on large-scale distributed computing and neural networks.

New – Export Amazon DynamoDB Table Data to Your Data Lake in Amazon S3, No Code Writing Required

Post Syndicated from Alex Casalboni original https://aws.amazon.com/blogs/aws/new-export-amazon-dynamodb-table-data-to-data-lake-amazon-s3/

Hundreds of thousands of AWS customers have chosen Amazon DynamoDB for mission-critical workloads since its launch in 2012. DynamoDB is a nonrelational managed database that allows you to store a virtually infinite amount of data and retrieve it with single-digit-millisecond performance at any scale. To get the most value out of this data, customers had […]

Handling data erasure requests in your data lake with Amazon S3 Find and Forget

Post Syndicated from Chris Deigan original https://aws.amazon.com/blogs/big-data/handling-data-erasure-requests-in-your-data-lake-with-amazon-s3-find-and-forget/

Data lakes are a popular choice for organizations to store data around their business activities. Best practice design of data lakes impose that data is immutable once stored, but new regulations such as the European General Data Protection Regulation (GDPR), California Consumer Privacy Act (CCPA), and others have created new obligations that operators now need to be able to erase private data from their data lake when requested.

When asked to erase an individual’s private data, as a data lake operator you have to find all the objects in your Amazon Simple Storage Service (Amazon S3) buckets that contain data relating to that individual. This can be complex because data lakes contain many S3 objects (each of which may contain multiple rows), as shown in the following diagram. You often can’t predict which objects contain data relating to an individual, so you need to check each object. For example, if the user mary34 asks to be removed, you need to check each object to determine if it contains data relating to mary34. This is the first challenge operators face: identifying which objects contain data of interest.

After you identify objects containing data of interest, you face a second challenge: you need to retrieve the object from the S3 bucket, remove relevant rows from the file, put a new version of the object into S3, and make sure you delete any older versions.

Locating and removing data manually can be time-consuming and prone to mistakes, considering the large number of objects typically in data lakes.

Amazon S3 Find and Forget solves these challenges with ready-to-use automations. It allows you to remove records from data lakes of any size that are in AWS Glue Data Catalog. The solution includes a web user interface that you can use and an API that you can use to integrate with your own applications.

Solution overview

Amazon S3 Find and Forget enables you to find and delete records automatically in data lakes on Amazon S3. Using the solution, you can:

  • Define which tables from your AWS Glue Data Catalog contain data you want to erase
  • Manage a queue of identifiers (such as unique customer identifiers) to erase
  • Erase rows from your data lake matching the queued record identifiers
  • Access a log of all actions taken by the solution

You can use Amazon S3 Find and Forget to work with data lakes stored on Amazon S3 in a supported file format.

The solution is developed and distributed as open-source software that you deploy and run inside your own AWS account. When deploying this solution, you only pay for the AWS services consumed to run it. We recommend reviewing the Cost Estimate guide and creating Amazon CloudWatch Billing Alarms to monitor charges before deploying the solution in your own account.

When you handle requests to remove data, you add the identifiers through the web interface or API to a Deletion Queue. The identifiers remain in the queue until you start a Deletion Job. The Deletion Job processes the queue and removes matching rows from objects in your data lake.

Where your requirements allow it, batching deletions can provide significant cost savings by minimizing the number of times the data lake needs to be re-scanned and processed. For example, you could start a Deletion Job once a week to process all requests received in the preceding week.

Solution demonstration

This section provides a demonstration of using Amazon S3 Find and Forget’s main features. To deploy the solution in your own account, refer to the User Guide.

For this demonstration, I have prepared in advance:

The first step is to deploy the solution using AWS CloudFormation by following the instructions in the User Guide. The CloudFormation stack can take 20-30 minutes to deploy depending on the options chosen when deploying.

Once deployed, I visit the web user interface by going to the address in the WebUIUrl CloudFormation stack output. Using a temporary password emailed to the address I provided in my CloudFormation parameters, I login and set a password for future use. I then see a dashboard with some base metrics for my Amazon S3 Find and Forget deployment:

I now need to create a Data Mapper so that Amazon S3 Find and Forget can find my data lake. To do this, I select Data Mappers, then Create Data Mapper:

On this screen, I give my Data Mapper a name, choose the AWS Glue database and table in my account that I want to operate on, and the columns that I want my deletions to match. In this demonstration, I’m using a copy of the Amazon Customer Reviews Dataset that I copied to my own S3 bucket. I’ll be using the customer_id column to remove data. In the dataset, this field contains a unique identifier for each customer who has created a product review.

I then specify the IAM role to be used when modifying the objects in S3. I also choose whether I want the old S3 object versions to be deleted for me. I can turn this off if I want to implement my own strategy to manage deleting old object versions, such as by using S3 lifecycle policies.

After choosing Create Data Mapper the Data Mapper is created, and I am prompted to grant permissions for S3 Find and Forget to operate in my bucket. In the Data Mapper list, I select my new Data Mapper, then choose Generate Access Policies. The interface displays a sample bucket policy that I copy and paste into the bucket policy for my S3 bucket in the AWS Management Console.

With the Data Mapper set up, I’m now able to add the customers who have requested to have their data deleted to the Deletion Queue. Using their Customer IDs, I go to the Deletion Queue section and select Add Match to the Deletion Queue.

I’ve chosen to delete from all the available Data Mappers, but I can also choose specific ones. Once I’ve added my matches, I can see a list of them on Deletion Queue page:

I can now run a deletion job that will cause the matches to be deleted from the data lake. To do this, I select Deletion Jobs then Start a Deletion Job.

After a few minutes the Deletion Job completes, and I can see metrics collected during the job including that the job took just over two-and-a-half minutes:

There is an Export to JSON option that includes all the metrics shown, more granular information about the Deletion Job, and which S3 objects were modified.

At this point the Deletion Queue is empty, and ready for me to use for future requests.

Solution design

This section includes a brief introduction to how the solution works. More comprehensive design documentation is available in the Amazon S3 Find and Forget GitHub repository.

The following diagram illustrates the architecture of this solution.

Amazon S3 Find and Forget uses AWS Serverless services to optimize for cost and scalability. The user interface and API are built using Amazon S3, Amazon Cognito, AWS Lambda, Amazon DynamoDB, and Amazon API Gateway, which automatically scale down when not in use so that there is no expensive baseline cost just for having the solution installed. These AWS services are always available and scale in concert with when the solution is used with a pay-for-what-you-use price model.

The Deletion Job workflow is coordinated using AWS Step Functions, Lambda, and Amazon Simple Queue Service (Amazon SQS). The solution uses Step Functions for high-level coordination and state tracking in the workflow, Lambda functions for discrete computation tasks, and Amazon SQS to store queues of repetitive work.

A deletion job has two phases: Find and Forget. In the Find phase, the solution uses Amazon Athena to scan the data lake for objects containing rows matching the identifiers in the deletion queue. For this to work at scale, we built a query planner Lambda function that uses the partition list in the AWS Glue Data Catalog for each data mapper to run an Athena query on each partition, returning the path to S3 objects that contain matches with the identifiers in the Deletion Queue. The object keys are then added to an SQS queue that we refer to as the Object Deletion Queue.

In the Forget phase, deletion workers are started as a service running on AWS Fargate. These workers process each object in the Object Deletion Queue by downloading the objects from the S3 bucket into memory, deleting the rows that contain matched identifiers, then putting a new version of the object to the S3 bucket using the same key. By default, older versions of the object are then deleted from the S3 bucket to make the deletion irreversible. You can alternatively disable this feature to implement your own strategy for deleting old object versions, such as by using an S3 Lifecycle policy.

Note that during the Forget phase, affected S3 objects are replaced at the time they are processed and are subject to the Amazon S3 data consistency model. We recommend that you avoid running a Deletion Job in parallel to a workload that reads from the data lake unless it has been designed to handle temporary inconsistencies between objects.

When the object deletion queue is empty, the Forget phase is complete and a final status is determined for the Deletion Job based on whether any errors occurred (for example, due to missing permissions for S3 objects).

Logs are generated for all actions throughout the Deletion Job, which you can use for reporting or troubleshooting. These are stored in DynamoDB, along with other persistent data including the Data Mappers and Deletion Queue.

Conclusion

In this post, we introduced the Amazon S3 Find and Forget solution, which assists data lake operators to handle data erasure requests they may receive pursuant to regulations such as GDPR, CCPA, and others. We then described features of the solution and how to use it for a basic use case.

You can get started today by deploying the solution from the GitHub repository, where you can also find more documentation of how the solution works, its features, and limits. We are continuing to develop the solution and welcome you to send feedback, feature requests, or questions through GitHub Issues.

 


About the Authors

Chris Deigan is an AWS Solution Engineer in London, UK. Chris works with AWS Solution Architects to create standardized tools, code samples, demonstrations, and quick starts.

 

 

 

Matteo Figus is an AWS Solution Engineer based in the UK. Matteo works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. He is passionate about open-source software and in his spare time he likes to cook and play the piano.

 

 

 

Nick Lee is an AWS Solution Engineer based in the UK. Nick works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. In his spare time he enjoys playing football and squash, and binge-watching TV shows.

 

 

 

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

 

Cristina Fuia is a Specialist Solutions Architect for Analytics at AWS. She works with customers across EMEA helping them to solve complex problems, design and build data architectures so that they can get business value from analyzing their data.