Kinesis Data Firehose now supports dynamic partitioning to Amazon S3

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/

Amazon Kinesis Data Firehose provides a convenient way to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service, generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt your data streams before loading, which minimizes the amount of storage used and increases security.

Customers who use Amazon Kinesis Data Firehose often want to partition their incoming data dynamically based on information that is contained within each record, before sending the data to a destination for analysis. An example of this would be segmenting incoming Internet of Things (IoT) data based on what type of device generated it: Android, iOS, FireTV, and so on. Previously, customers would need to run an entirely separate job to repartition their data after it lands in Amazon S3 to achieve this functionality.

Kinesis Data Firehose data partitioning simplifies the ingestion of streaming data into Amazon S3 data lakes, by automatically partitioning data in transit before it’s delivered to Amazon S3. This makes the datasets immediately available for analytics tools to run their queries efficiently and enhances fine-grained access control for data. For example, marketing automation customers can partition data on the fly by customer ID, which allows customer-specific queries to query smaller datasets and deliver results faster. IT operations or security monitoring customers can create groupings based on event timestamps that are embedded in logs, so they can query smaller datasets and get results faster.

In this post, we’ll discuss the new Kinesis Data Firehose dynamic partitioning feature, how to create a dynamic partitioning delivery stream, and walk through a real-world scenario where dynamically partitioning data that is delivered into Amazon S3 could improve the performance and scalability of the overall system. We’ll then discuss some best practices around what makes a good partition key, how to handle nested fields, and integrating with Lambda for preprocessing and error handling. Finally, we’ll cover the limits and quotas of Kinesis Data Firehose dynamic partitioning, and some pricing scenarios.

Data partitioning with Kinesis Data Firehose

First, let’s discuss why you might want to use dynamic partitioning instead of Kinesis Data Firehose’s standard timestamp-based data partitioning. Consider a scenario where your analytical data lake in Amazon S3 needs to be filtered according to a specific field, such as a customer identification—customer_id. Using the standard timestamp-based strategy, your data will look something like this, where <DOC-EXAMPLE-BUCKET> stands for your bucket name.

s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=01/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=01/part-0001.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=02/part-0002.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=02/part-0003.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=03/part-0004.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=03/part-0005.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=04/part-0006.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=04/part-0007.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=05/part-0008.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=05/part-0009.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=06/part-0010.parquet

The difficulty in identifying particular customers within this array of data is that a full file scan will be required to locate any individual customer. Now consider the data partitioned by the identifying field, customer_id.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-000/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-001/part-0001.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-002/part-0002.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-003/part-0003.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-004/part-0004.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-005/part-0005.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-006/part-0006.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-007/part-0007.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-008/part-0008.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-009/part-0009.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-010/part-0010.parquet

In this data partitioning scheme, you only need to scan one folder to find data related to a particular customer. This is how analytics query engines like Amazon Athena, Amazon Redshift Spectrum, or Presto are designed to work—they prune unneeded partitioning during query execution, thereby reducing the amount of data that is scanned and transferred. Partitioning data like this will result in less data scanned overall.

Key features

With the launch of Kinesis Data Firehose Dynamic Partitioning, you can now enable data partitioning to be dynamic, based on data content within the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS SDK when you create or update an existing Kinesis Data Firehose delivery stream.

At a high level, Kinesis Data Firehose Dynamic Partitioning allows for easy extraction of keys from incoming records into your delivery stream by allowing you to select and extract JSON data fields in an easy-to-use query engine.

Kinesis Data Firehose Dynamic Partitioning and key extraction will result in larger file sizes landing in Amazon S3, in addition to allowing for columnar data formats, like Apache Parquet, that query engines prefer.

With Kinesis Data Firehose Dynamic Partitioning, you have the ability to specify delimiters to detect or add on to your incoming records. This makes it possible to clean and organize data in a way that a query engine like Amazon Athena or AWS Glue would expect. This not only saves time but also cuts down on additional processes after the fact, potentially reducing costs in the process.

Kinesis Data Firehose has built-in support for extracting the keys for partitioning records that are in JSON format. You can select and extract the JSON data fields to be used in partitioning by using JSONPath syntax. These fields will then dictate how the data is partitioned when it’s delivered to Amazon S3. As we’ll discuss in the walkthrough later in this post, extracting a well-distributed set of partition keys is critical to optimizing your Kinesis Data Firehose delivery stream that uses  dynamic partitioning.

If the incoming data is compressed, encrypted, or in any other file format, you can include in the PutRecord or PutRecords API calls the data fields for partitioning. You can also use the integrated Lambda function with your own custom code to decompress, decrypt, or transform the records to extract and return the data fields that are needed for partitioning. This is an expansion of the existing transform Lambda function that is available today with Kinesis Data Firehose. You can transform, parse, and return the data fields by using the same Lambda function.

In order to achieve larger file sizes when it sinks data to Amazon S3, Kinesis Data Firehose buffers incoming streaming data to a specified size or time period before it delivers to Amazon S3. Buffer sizes range from 1 MB to 4 GB when data is delivered to Amazon S3, and the buffering interval ranges from 1 minute to 1 hour.

Example of a partitioning structure

Consider the following clickstream event.

{
    "type": {
        "device": "mobile",
        "event": "user_clicked_submit_button"
    },
    "customer_id": "1234",
    "event_timestamp":1630442383,
    "geo": "US"
}

You can now partition your data by customer_id, so Kinesis Data Firehose will automatically group all events with the same customer_id and deliver them to separate folders in your S3 destination bucket. The new folders will be created dynamically—you only specify which JSON field will act as dynamic partition key.

Assume that you want to have the following folder structure in your S3 data lake.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/part-0001.parquet

s3://<DOC-EXAMPLE-BUCKET>/customer_id=4567/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=4567/part-0001.parquet 

The Kinesis Data Firehose configuration for the preceding example will look like the one shown in the following screenshot.

Kinesis Data Firehose evaluates the prefix expression at runtime. It groups records that match the same evaluated S3 prefix expression into a single dataset. Kinesis Data Firehose then delivers each dataset to the evaluated S3 prefix. The frequency of dataset delivery to S3 is determined by the delivery stream buffer setting.

You can do even more with the jq JSON processor, including accessing nested fields and create complex queries to identify specific keys among the data.

In the following example, I decide to store events in a way that will allow me to scan events from the mobile devices of a particular customer.

{
    "type": {
        "device": "mobile",
        "event": "user_clicked_submit_button"
    },
    "customer_id": "1234",
    "event_timestamp": 1630442383
    "geo": "US"
}

Given the same event, I’ll use both the device and customer_id fields in the Kinesis Data Firehose prefix expression, as shown in the following screenshot. Notice that the device is a nested JSON field.

The generated S3 folder structure will be as follows, where <DOC-EXAMPLE-BUCKET> is your bucket name.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=mobile/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=mobile/part-0001.parquet

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=desktop/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=desktop/part-0001.parquet

Now assume that you want to partition your data based on the time when the event actually was sent, as opposed to using Kinesis Data Firehose native support for ApproximateArrivalTimestamp, which represents the time in UTC when the record was successfully received and stored in the stream. The time in the event_timestamp field might be in a different time zone.

With Kinesis Data Firehose Dynamic Partitioning, you can extract and transform field values on the fly. I’ll use the event_timestamp field to partition the events by year, month, and day, as shown in the following screenshot.

The preceding expression will produce the following S3 folder structure, where <DOC-EXAMPLE-BUCKET> is your bucket name.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=21/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=21/part-0001.parquet

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=22/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=22/part-0001.parquet

Create a dynamically partitioned delivery stream

To begin delivering dynamically partitioned data into Amazon S3, navigate to the Amazon Kinesis console page by searching for or selecting Kinesis.

From there, choose Create Delivery Stream, and then select your source and sink.

For this example, you will receive data from a Kinesis Data Stream, but you can also choose Direct PUT or other sources as the source of your delivery stream.

For the destination, choose Amazon S3.

Next, choose the Kinesis Data Stream source to read from. If you have a Kinesis Data Stream previously created, simply choose Browse and select from the list. If not, follow this guide on how to create a Kinesis Data Stream.

Give your delivery stream a name and continue on to the Transform and convert records section of the create wizard.

In order to transform your source records with AWS Lambda, you can enable data transformation. This process will be covered in the next section, and we’ll leave both the AWS Lambda transformation as well as the record format conversion disabled for simplicity.

For your S3 destination, select or create an S3 bucket that your delivery stream has permissions to.

Below that setting, you can now enable dynamic partitioning on the incoming data in order to deliver data to different S3 bucket prefixes based on your specified JSONPath query.

You now have the option to enable the following features, shown in the screenshot below:

  • Multi record deaggregation – Separate records that enter your delivery stream based on valid JSON criteria or a new line delimiter such as \n. This can be useful for data that comes in to your delivery stream in a specific format, but needs to be reformatted according to the downstream analysis engine.
  • New line delimiter – Configure your delivery stream to add a new line delimiter between records within objects delivered to Amazon S3, such as \n.
  • Inline parsing for JSON – Specify data record parameters to be used as dynamic partitioning keys, and provide a value for each key.

You can simply add your key/value pairs, then choose Apply dynamic partitioning keys to apply the partitioning scheme to your S3 bucket prefix. Keep in mind that you will also need to supply an error prefix for your S3 bucket before continuing.

Set your S3 buffering conditions appropriately for your use case. In my example, I’ve lowered the buffering to the minimum of 1 MiB of data delivered, or 60 seconds before delivering data to Amazon S3.

Keep the defaults for the remaining settings, and then choose Create delivery stream.

After data begins flowing through your pipeline, within the buffer interval you will see data appear in S3, partitioned according to the configurations within your Kinesis Data Firehose.

For delivery streams without the dynamic partitioning feature enabled, there will be one buffer across all incoming data. When data partitioning is enabled, Kinesis Data Firehose will have a buffer per partition, based on incoming records. The delivery stream will deliver each buffer of data as a single object when the size or interval limit has been reached, independent of other data partitions.

Lambda transformation of non-JSON records

If the data flowing through a Kinesis Data Firehose is compressed, encrypted, or in any non-JSON file format, the dynamic partitioning feature won’t be able to parse individual fields by using the JSONPath syntax specified previously. To use the dynamic partitioning feature with non-JSON records, use the integrated Lambda function with Kinesis Data Firehose to transform and extract the fields needed to properly partition the data by using JSONPath.

The following Lambda function will decode a user payload, extract the necessary fields for the Kinesis Data Firehose dynamic partitioning keys, and return a proper Kinesis Data Firehose file, with the partition keys encapsulated in the outer payload.

# This is an Amazon Kinesis Data Firehose stream processing Lambda function that  
# replays every read record from input to output  
# and extracts partition keys from the records.  
  
from __future__ import print_function  
import base64  
import json  
import datetime  
  
# Signature for all Lambda functions that user must implement  
def lambda_handler(firehose_records_input, context):  
      
    # Create return value.  
    firehose_records_output = {}  
    # Create result object.  
    firehose_records_output['records'] = []  
  
    print("\n")  
    # Go through records and process them.  
    for firehose_record_input in firehose_records_input['records']:  
  
        # Get user payload.  
        payload = base64.b64decode(firehose_record_input['data'])  
        jsonVal = json.loads(payload)  
          
        print("Record that was received")  
        print(jsonVal)  
        print("\n")  
        # Create output Firehose record and add modified payload and record ID to it.  
        firehose_record_output = {}  
        eventTimestamp = datetime.datetime.fromtimestamp(jsonVal['eventTimestamp'])  
        partitionKeys = {}  
        partitionKeys["customerId"] = jsonVal['customerId']  
        partitionKeys["year"] = eventTimestamp.strftime('%Y')  
        partitionKeys["month"] = eventTimestamp.strftime('%m')  
        partitionKeys["date"] = eventTimestamp.strftime('%d')  
        partitionKeys["hour"] = eventTimestamp.strftime('%H')  
        partitionKeys["minute"] = eventTimestamp.strftime('%M')  
  
          
        # Must set proper record ID.  
        firehose_record_output['recordId'] = firehose_record_input['recordId']  
        firehose_record_output['data'] = firehose_record_input['data']  
        firehose_record_output['result'] =  'Ok'  
        firehose_record_output['partitionKeys'] =  partitionKeys  
  
        # Add the record to the list of output records.  
        firehose_records_output['records'].append(firehose_record_output)  
  
    # At the end return processed records.  
    return firehose_records_output  

Using Lambda to extract the necessary fields for dynamic partitioning provides both the benefit of encrypted and compressed data and the benefit of dynamically partitioning data based on record fields.

Limits and quotas

Kinesis Data Firehose Dynamic Partitioning has a limit of 500 active partitions per delivery stream while it is actively buffering data—in other words, how many active partitions exist in the delivery stream during the configured buffering hints. This limit is adjustable, and if you want to increase it, you’ll need to submit a support ticket for a limit increase.

Each new value that is determined by the JSONPath select query will result in a new partition in the Kinesis Data Firehose delivery stream. The partition has an associated buffer of data that will be delivered to Amazon S3 in the evaluated partition prefix. Upon delivery to Amazon S3, the buffer that previously held that data and the associated partition will be deleted and deducted from the active partitions count in Kinesis Data Firehose.

Consider the following records that were ingested to my delivery stream.

{"customer_id": "123"}
{"customer_id": "124"}
{"customer_id": "125"}

If I decide to use customer_id for my dynamic data partitioning and deliver records to different prefixes, I’ll have three active partitions if the records keep ingesting for all of my customers. When there are no more records for "customer_id": "123", Kinesis Data Firehose will delete the buffer and will keep only two active partitions.

If you exceed the maximum number of active partitions, the rest of the records in the delivery stream will be delivered to the S3 error prefix. For more information, see the Error Handling section of this blog post.

A maximum throughput of 25 MB per second is supported for each active partition. This limit is not adjustable. You can monitor the throughput with the new metric called PerPartitionThroughput.

Best practices

The right partitioning can help you to save costs related to the amount of data that is scanned by analytics services like Amazon Athena. On the other hand, over-partitioning may lead to the creation of smaller objects and wipe out the initial benefit of cost and performance. See Top 10 Performance Tuning Tips for Amazon Athena.

We advise you to align your partitioning keys with your analysis queries downstream to promote compatibility between the two systems. At the same time, take into consideration how high cardinality can impact the dynamic partitioning active partition limit.

When you decide which fields to use for the dynamic data partitioning, it’s a fine balance between picking fields that match your business case and taking into consideration the partition count limits. You can monitor the number of active partitions with the new metric PartitionCount, as well as the number of partitions that have exceeded the limit with the metric PartitionCountExceeded.

Another way to optimize cost is to aggregate your events into a single PutRecord and PutRecordBatch API call. Because Kinesis Data Firehose is billed per GB of data ingested, which is calculated as the number of data records you send to the service, times the size of each record rounded up to the nearest 5 KB, you can put more data per each ingestion call.

Data partition functionality is run after data is de-aggregated, so each event will be sent to the corresponding Amazon S3 prefix based on the partitionKey field within each event.

Error handling

Imagine that the following record enters your Kinesis Data Firehose delivery stream.

{“customerID”: 1000}

When your dynamic partition query scans over this record, it will be unable to locate the specified key of customer_id, and therefore will result in an error. In this scenario, we suggest using S3 error prefix when you create or modify your Kinesis Data Firehose stream.

All failed records will be delivered to the error prefix. The records you might find there are the events without the field you specified as your partition key.

Cost and pricing examples

Kinesis Data Firehose Dynamic Partitioning is billed per GB of partitioned data delivered to S3, per object, and optionally per jq processing hour for data parsing. The cost can vary based on the AWS Region where you decide to create your stream.

For more information, see our pricing page.

Summary

In this post, we discussed the Kinesis Data Firehose Dynamic Partitioning feature, and explored the use cases where this feature can help improve pipeline performance. We also covered how to develop and optimize a Kinesis Data Firehose pipeline by using dynamic partitioning and the best practices around building a reliable delivery stream.

Kinesis Data Firehose dynamic partitioning will be available in all Regions at launch, and we urge you to try the new feature to see how it can simplify your delivery stream and query engine performance. Be sure to provide us with any feedback you have about the new feature.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 5 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day and process complex machine learning algorithms in real time. At AWS, he is a Solutions Architect Streaming Specialist, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Kinesis.

Michael Greenshtein’s career started in software development and shifted to DevOps. Michael worked with AWS services to build complex data projects involving real-time, ETLs, and batch processing. Now he works in AWS as Solutions Architect in the Europe, Middle East, and Africa (EMEA) region, supporting a variety of customer use cases.