Automating bucketing of streaming data using Amazon Athena and AWS Lambda

Post Syndicated from Ahmed Saef Zamzam original https://aws.amazon.com/blogs/big-data/automating-bucketing-of-streaming-data-using-amazon-athena-and-aws-lambda/

In today’s world, data plays a vital role in helping businesses understand and improve their processes and services to reduce cost. You can use several tools to gain insights from your data, such as Amazon Kinesis Data Analytics or open-source frameworks like Structured Streaming and Apache Flink to analyze the data in real time. Alternatively, you can batch analyze the data by ingesting it into a centralized storage known as a data lake. Data lakes allow you to import any amount of data that can come in real time or batch. With Amazon Simple Storage Service (Amazon S3), you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% (11 9s) of durability.

After the data lands in your data lake, you can start processing this data using any Big Data processing tool of your choice. Amazon Athena is a fully managed interactive query service that enables you to analyze data stored in an Amazon S3-based data lake using standard SQL. You can also integrate Athena with Amazon QuickSight for easy visualization of the data.

When working with Athena, you can employ a few best practices to reduce cost and improve performance. Converting to columnar formats, partitioning, and bucketing your data are some of the best practices outlined in Top 10 Performance Tuning Tips for Amazon Athena. Bucketing is a technique that groups data based on specific columns together within a single partition. These columns are known as bucket keys. By grouping related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thus improving query performance and reducing cost. For example, imagine collecting and storing clickstream data. If you frequently filter or aggregate by user ID, then within a single partition it’s better to store all rows for the same user together. If user data isn’t stored together, then Athena has to scan multiple files to retrieve the user’s records. This leads to more files being scanned, and therefore, an increase in query runtime and cost.

Like partitioning, columns that are frequently used to filter the data are good candidates for bucketing. However, unlike partitioning, with bucketing it’s better to use columns with high cardinality as a bucketing key. For example, Year and Month columns are good candidates for partition keys, whereas userID and sensorID are good examples of bucket keys. By doing this, you make sure that all buckets have a similar number of rows. For more information, see Bucketing vs Partitioning.

For real-time data (such as data coming from sensors or clickstream data), streaming tools like Amazon Kinesis Data Firehose can convert the data to columnar formats and partition it while writing to Amazon S3. With Kafka, you can do the same thing with connectors. But what about bucketing? This post shows how to continuously bucket streaming data using AWS Lambda and Athena.

Overview of solution

The following diagram shows the high-level architecture of the solution.

The architecture includes the following steps:

  1. We use the Amazon Kinesis Data Generator (KDG) to simulate streaming data. Data is then written into Kinesis Data Firehose; a fully managed service that enables you to load streaming data to an Amazon S3-based data lake.
  2. Kinesis Data Firehose partitions the data by hour and writes new JSON files into the current partition in a /raw Each new partition looks like /raw/dt=<YYYY-MM-dd-HH>. Every hour, a new partition is created.
  3. Two Lambda functions are triggered on an hourly basis based on Amazon CloudWatch Events.
    • Function 1 (LoadPartition) runs every hour to load new /raw partitions to Athena SourceTable, which points to the /raw prefix.
    • Function 2 (Bucketing) runs the Athena CREATE TABLE AS SELECT (CTAS) query.
  4. The CTAS query copies the previous hour’s data from /raw to /curated and buckets the data while doing so. It loads the new data as a new partition to TargetTable, which points to the /curated prefix.

Overview of walkthrough

In this post, we cover the following high-level steps:

  1. Install and configure the KDG.
  2. Create a Kinesis Data Firehose delivery stream.
  3. Create the database and tables in Athena.
  4. Create the Lambda functions and schedule them.
  5. Test the solution.
  6. Create view that the combines data from both tables.
  7. Clean up.

Installing and configuring the KDG

First, we need to install and configure the KDG in our AWS account. To do this, we use the following AWS CloudFormation template.

For more information about installing the KDG, see the KDG Guide in GitHub.

To configure the KDG, complete the following steps:

  1. On the AWS CloudFormation console, locate the stack you just created.
  2. On the Outputs tab, record the value for KinesisDataGeneratorUrl.
  3. Log in to the KDG main page using the credentials created when you deployed the CloudFormation template.
  4. In the Record template section, enter the following template. Each record has three fields: sensorID, currentTemperature, and status.
    {
        "sensorId": {{random.number(4000)}},
        "currentTemperature": {{random.number(
            {
                "min":10,
                "max":50
            }
        )}},
        "status": "{{random.arrayElement(
            ["OK","FAIL","WARN"]
        )}}"
    }
    

  5. Choose Test template.

The result should look like the following screenshot.

We don’t start sending data now; we do this after creating all other resources.

Creating a Kinesis Data Firehose delivery stream

Next, we create the Kinesis Data Firehose delivery stream that is used to load the data to the S3 bucket.

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose.
  2. Choose Create delivery stream.
  3. For Delivery stream name, enter a name, such as AutoBucketingKDF.
  4. For Source, select Direct PUT or other sources.
  5. Leave all other settings at their default and choose Next.
  6. On Process Records page, leave everything at its default and choose Next.
  7. Choose Amazon S3 as the destination and choose your S3 bucket from the drop-down menu (or create a new one). For this post, I already have a bucket created.
  8. For S3 Prefix, enter the following prefix:
    raw/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}-!{timestamp:HH}/

We use custom prefixes to tell Kinesis Data Firehose to create a new partition every hour. Each partition looks like this: dt=YYYY-MM-dd-HH. This partition-naming convention conforms to the Hive partition-naming convention, <PartitionKey>=<PartitionKey>. In this case, <PartitionKey> is dt and <PartitionValue> is YYYY-MM-dd-HH. By doing this, we implement a flat partitioning model instead of hierarchical (year=YYYY/month=MM/day=dd/hour=HH) partitions. This model can be much simpler for end-users to work with, and you can use a single column (dt) to filter the data. For more information on flat vs. hierarchal partitions, see Data Lake Storage Foundation on GitHub.

  1. For S3 error prefix, enter the following code:
    myFirehoseFailures/!{firehose:error-output-type}/

  2. On the Settings page, leave everything at its default.
  3. Choose Create delivery stream.

Creating an Athena database and tables

In this solution, the Athena database has two tables: SourceTable and TargetTable. Both tables have identical schemas and will have the same data eventually. However, each table points to a different S3 location. Moreover, because data is stored in different formats, Athena uses a different SerDe for each table to parse the data. SourceTable uses JSON SerDe and TargetTable uses Parquet SerDe. One other difference is that SourceTable’s data isn’t bucketed, whereas TargetTable’s data is bucketed.

In this step, we create both tables and the database that groups them.

  1. On the Athena console, create a new database by running the following statement:
    CREATE DATABASE mydatabase

  2. Choose the database that was created and run the following query to create SourceTable. Replace <s3_bucket_name> with the bucket name you used when creating the Kinesis Data Firehose delivery stream.
    CREATE EXTERNAL TABLE mydatabase.SourceTable(
      sensorid string, 
      currenttemperature int, 
      status string)
    PARTITIONED BY ( 
      dt string)
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://<s3_bucket_name>/raw/'
    

  3. Run the following CTAS statement to create TargetTable:
    CREATE TABLE TargetTable
    WITH (
          format = 'PARQUET', 
          external_location = 's3://<s3_bucket_name>/curated/', 
          partitioned_by = ARRAY['dt'], 
          bucketed_by = ARRAY['sensorID'], 
          bucket_count = 3) 
    AS SELECT *
    FROM SourceTable

SourceTable doesn’t have any data yet. However, the preceding query creates the table definition in the Data Catalog. We configured this data to be bucketed by sensorID (bucketing key) with a bucket count of 3. Ideally, the number of buckets should be so that the files are of optimal size.

Creating Lambda functions

The solution has two Lambda functions: LoadPartiton and Bucketing. We use an AWS Serverless Application Model (AWS SAM) template to create, deploy, and schedule both functions.

Follow the instructions in the GitHub repo to deploy the template. When deploying the template, it asks you for some parameters. You can use the default parameters, but you have to change S3BucketName and AthenaResultLocation. For more information, see Parameter Details in the GitHub repo.

LoadPartition function

The LoadPartiton function is scheduled to run the first minute of every hour. Every time Kinesis Data Firehose creates a new partition in the /raw folder, this function loads the new partition to the SourceTable. This is crucial because the second function (Bucketing) reads this partition the following hour to copy the data to /curated.

Bucketing function

The Bucketing function is scheduled to run the first minute of every hour. It copies the last hour’s data from SourceTable to TargetTable. It does so by creating a tempTable using a CTAS query. This tempTable points to the new date-hour folder under /curated; this folder is then added as a single partition to TargetTable.

To implement this, the function runs three queries sequentially. The queries use two parameters:

  • <s3_bucket_name> – Defined by an AWS SAM parameter and should be the same bucket used throughout this solution
  • <last_hour_partition> – Is calculated by the function depending on which hour it’s running

The function first creates TempTable as the result of a SELECT statement from SourceTable. It stores the results in a new folder under /curated. The results are bucketed and stored in Parquet format. See the following code:

CREATE TABLE TempTable
    WITH (
      format = 'PARQUET', 
      external_location = 's3://<s3_bucket_name>/curated/dt=<last_hour_partition>/', 
      bucketed_by = ARRAY['sensorID'], 
      bucket_count = 3) 
    AS SELECT *
    FROM SourceTable
    WHERE dt='<last_hour_partiton>';

We create a new subfolder in /curated, which is new partition for TargetTable. So, after the TempTable creation is complete, we load the new partition to TargetTable:

ALTER TABLE TargetTable
                ADD IF NOT EXISTS
                PARTITION ('<last_hour_partiton>');

Finally, we delete tempTable from the Data Catalog:

DROP TABLE TempTable

Testing the solution

Now that we have created all resources, it’s time to test the solution. We start by generating data from the KDG and waiting for an hour to start querying data in TargetTable (the bucketed table).

  1. Log in to the KDG. You should find the template you created earlier. For the configuration, choose the following:
    1. The Region used.
    2. For the delivery stream, choose the Kinesis Data Firehose you created earlier.
    3. For records/sec, enter 3000.
  2. Choose Send data.

The KDG starts sending simulated data to Kinesis Data Firehose. After 1 minute, a new partition should be created in Amazon S3.

The Lambda function that loads the partition to SourceTable runs on the first minute of the hour. If you started sending data after the first minute, this partition is missed because the next run loads the next hour’s partition, not this one. To mitigate this, run MSCK REPAIR TABLE SourceTable only for the first hour.

  1. To benchmark the performance between both tables, wait for an hour so that the data is available for querying in TargetTable.
  2. When the data is available, choose one sensorID and run the following query on SourceTable and TargetTable.
    SELECT sensorID, avg(currenttemperature) as AverageTempreture 
    FROM <TableName>
    WHERE dt='<YYYY-MM-dd-HH>' AND sensorID ='<sensorID_selected>'
    GROUP BY 1

The following screenshot shows the query results for SourceTable. It shows the runtime in seconds and amount of data scanned.

The following screenshot shows the query results for TargetTable.

If you look at these results, you don’t see a huge difference in runtime for this specific query and dataset; for other datasets, this difference should be more significant. However, from a data scanning perspective, after bucketing the data, we reduced the data scanned by approximately 98%. Therefore, for this specific use case, bucketing the data lead to a 98% reduction in Athena costs because you’re charged based on the amount of data scanned by each query.

Querying the current hour’s data

Data for the current hour isn’t available immediately in TargetTable. It’s available for querying after the first minute of the following hour. To query this data immediately, we have to create a view that UNIONS the previous hour’s data from TargetTable with the current hour’s data from SourceTable. If data is required for analysis after an hour of its arrival, then you don’t need to create this view.

To create this view, run the following query in Athena:

CREATE OR REPLACE VIEW combined AS

SELECT *, "$path" AS file
FROM SourceTable
WHERE dt >= date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

UNION ALL 

SELECT *, "$path" AS file
FROM TargetTable
WHERE dt < date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

Cleaning up

Delete the resources you created if you no longer need them.

  1. Delete the Kinesis Data Firehose delivery stream.
  2. In Athena, run the following statements
    1. DROP DATABASE mydatabase
    2. DROP TABLE SourceTable
    3. DROP TABLE TargetTable
  3. Delete the AWS SAM template to delete the Lambda functions.
  4. Delete the CloudFormation stack for the KDG. For more information, see Deleting a stack on the AWS CloudFormation console.

Conclusion

Bucketing is a powerful technique and can significantly improve performance and reduce Athena costs. In this post, we saw how to continuously bucket streaming data using Lambda and Athena. We used a simulated dataset generated by Kinesis Data Generator. The same solution can apply to any production data, with the following changes:

  • DDL statements
  • Functions used can work with data that is partitioned by hour with the partition key ‘dt’ and partition value <YYYY-MM-dd-HH>. If your data is partitioned in a different way, edit the Lambda functions accordingly.
  • Frequency of Lambda triggers.

About the Author

Ahmed Zamzam is a Solutions Architect with Amazon Web Services. He supports SMB customers in the UK in their digital transformation and their cloud journey to AWS, and specializes in Data Analytics. Outside of work, he loves traveling, hiking, and cycling.