Build a real-time GDPR-aligned Apache Iceberg data lake

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-real-time-gdpr-aligned-apache-iceberg-data-lake/

Data lakes are a popular choice for today’s organizations to store their data around their business activities. As a best practice of a data lake design, data should be immutable once stored. But regulations such as the General Data Protection Regulation (GDPR) have created obligations for data operators who must be able to erase or update personal data from their data lake when requested.

A data lake built on AWS uses Amazon Simple Storage Service (Amazon S3) as its primary storage environment. When a customer asks to erase or update private data, the data lake operator needs to find the required objects in Amazon S3 that contain the required data and take steps to erase or update that data. This activity can be a complex process for the following reasons:

  • Data lakes may contain many S3 objects (each may contain multiple rows), and often it’s difficult to find the object containing the exact data that needs to be erased or personally identifiable information (PII) to be updated as per the request
  • By nature, S3 objects are immutable and therefore applying direct row-based transactions like DELETE or UPDATE isn’t possible

To handle these situations, a transactional feature on S3 objects is required, and frameworks such as Apache Hudi or Apache Iceberg provide you the transactional feature for upserts in Amazon S3.

AWS contributed the Apache Iceberg integration with the AWS Glue Data Catalog, which enables you to use open-source data computation engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg, enabling transaction queries on S3 objects.

In this post, we show you how to stream real-time data to an Iceberg table in Amazon S3 using AWS Glue streaming and perform transactions using Amazon Athena for deletes and updates. We use a serverless mechanism for this implementation, which requires minimum operational overhead to manage and fine-tune various configuration parameters, and enables you to extend your use case to ACID operations beyond the GDPR.

Solution overview

We used the Amazon Kinesis Data Generator (KDG) to produce synthetic streaming data in Amazon Kinesis Data Streams and then processed the streaming input data using AWS Glue streaming to store the data in Amazon S3 in Iceberg table format. As part of the customer’s request, we ran delete and update statements using Athena with Iceberg support.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  1. Streaming data is generated in JSON format using the KDG template and inserted into Kinesis Data Streams.
  2. An AWS Glue streaming job is connected to Kinesis Data Streams to process the data using the Iceberg connector.
  3. The streaming job output is stored in Amazon S3 in Iceberg table format.
  4. Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in Iceberg format.
  5. Athena interacts with the Data Catalog tables in Iceberg format for transactional queries required for GDPR.

The codebase required for this post is available in the GitHub repository.

Prerequisites

Before starting the implementation, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy your solution resources:

  1. After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:
  2. For Stack name, enter a name.
  3. For Username, enter the user name for the KDG.
  4. For Password, enter the password for the KDG (this must be at least six alphanumeric characters, and contain at least one number).
  5. For IAMGlueStreamingJobRoleName, enter a name for the IAM role used for the AWS Glue streaming job.
  6. Choose Next and create your stack.

This CloudFormation template configures the following resources in your account:

  • An S3 bucket named streamingicebergdemo-XX (note that the XX part is a random unique number to make the S3 bucket name unique)
  • An IAM policy and role
  • The KDG URL used for creating synthetic data
  1. After you complete the setup, go to the Outputs tab of the CloudFormation stack to get the S3 bucket name, AWS Glue job execution role (as per your input), and KDG URL.
  2. Before proceeding with the demo, create a folder named custdata under the created S3 bucket.

Create a Kinesis data stream

We use Kinesis Data Streams to create a serverless streaming data service that is built to handle millions of events with low latency. The following steps guide you on how to create the data stream in the us-east-1 Region:

  1. Log in to the AWS Management Console.
  2. Navigate to Kinesis console (make sure the Region is us-east-1).
  3. Select Kinesis Data Streams and choose Create data stream.
  4. For Data stream name, enter demo-data-stream.
  5. For this post, we select On-demand as the Kinesis data stream capacity mode.

On-demand mode works to eliminate the need for provisioning and managing the capacity for streaming data. However, you can implement this solution with Kinesis Data Streams in provisioned mode as well.

  1. Choose Create data stream.
  2. Wait for successful creation of demo-data-stream and for it to be in Active status.

Set up the Kinesis Data Generator

To create a sample streaming dataset, we use the KDG URL generated on the CloudFormation stack Outputs tab and log in with the credentials used in the parameters for the CloudFormation template. For this post, we use the following template to generate sample data in the demo-data-stream Kinesis data stream.

  1. Log in to the KDG URL with the user name and password you supplied during stack creation.
  2. Change the Region to us-east-1.
  3. Select the Kinesis data stream demo-data-stream.
  4. For Records per second, choose Constant and enter 100 (it can be another number, depending on the rate of record creation).
  5. On the Template 1 tab, enter the KDG data generation template:
{
"year": "{{random.number({"min":2000,"max":2022})}}",
"month": "{{random.number({"min":1,"max":12})}}",
"day": "{{random.number({"min":1,"max":30})}}",
"hour": "{{random.number({"min":0,"max":24})}}",
"minute": "{{random.number({"min":0,"max":60})}}",
"customerid": {{random.number({"min":5023,"max":59874})}},
"firstname" : "{{name.firstName}}",
"lastname" : "{{name.lastName}}",
"dateofbirth" : "{{date.past(70)}}",
"city" : "{{address.city}}",
"buildingnumber" : {{random.number({"min":63,"max":947})}},
"streetaddress" : "{{address.streetAddress}}",
"state" : "{{address.state}}",
"zipcode" : "{{address.zipCode}}",
"country" : "{{address.country}}",
"countrycode" : "{{address.countryCode}}",
"phonenumber" : "{{phone.phoneNumber}}",
"productname" : "{{commerce.productName}}",
"transactionamount": {{random.number(
{
"min":10,
"max":150
}
)}}
}
  1. Choose Test template to test the sample records.
  2. When the testing is correct, choose Send data.

This will start sending 100 records per second in the Kinesis data stream. (To stop sending data, choose Stop Sending Data to Kinesis.)

Integrate Iceberg with AWS Glue

To add the Apache Iceberg Connector for AWS Glue, complete the following steps. The connector is free to use and supports AWS Glue 1.0, 2.0, and 3.0.

  1. On the AWS Glue console, choose AWS Glue Studio in the navigation pane.
  2. In the navigation pane, navigate to AWS Marketplace.
  3. Search for and choose Apache Iceberg Connector for AWS Glue.
  4. Choose Accept Terms and Continue to Subscribe.
  5. Choose Continue to Configuration.
  6. For Fulfillment option, choose your AWS Glue version.
  7. For Software version, choose the latest software version.
  8. Choose Continue to Launch.
  9. Under Usage Instructions, choose the link to activate the connector.
  10. Enter a name for the connection, then choose Create connection and activate the connector.
  11. Verify the new connector on the AWS Glue Studio Connectors.

Create the AWS Glue Data Catalog database

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location and schema of your data. You use the information in the Data Catalog to create and monitor your ETL jobs.

For this post, we create a Data Catalog database named icebergdemodb containing the metadata information of a table named customer, which will be queried through Athena.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose Add database.
  3. For Database name, enter icebergdemodb.

This creates an AWS Glue database for metadata storage.

Create a Data Catalog table in Iceberg format

In this step, we create a Data Catalog table in Iceberg table format.

  1. On the Athena console, create an Athena workgroup named demoworkgroup for SQL queries.
  2. Choose Athena engine version 3 for Query engine version.

For more information about Athena versions, refer to Changing Athena engine versions.

  1. Enter the S3 bucket location for Query result configuration under Additional configurations.
  2. Open the Athena query editor and choose demoworkgroup.
  3. Choose the database icebergdemodb.
  4. Enter and run the following DDL to create a table pointing to the Data Catalog database icerbergdemodb. Note that the TBLPROPERTIES section mentions ICEBERG as the table type and LOCATION points to the S3 folder (custdata) URI created in earlier steps. This DDL command is available on the GitHub repo.
CREATE TABLE icebergdemodb.customer(
year string,
month string,
day string,
hour string,
minute string,
customerid string,
firstname string,
lastname string,
dateofbirth string,
city string,
buildingnumber string,
streetaddress string,
state string,
zipcode string,
country string,
countrycode string,
phonenumber string,
productname string,
transactionamount int)
LOCATION '<S3 Location URI>'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912',
'optimize_rewrite_delete_file_threshold'='10'
);

After you run the command successfully, you can see the table customer in the Data Catalog.

Create an AWS Glue streaming job

In this section, we create the AWS Glue streaming job, which fetches the record from the Kinesis data stream using the Spark script editor.

  1. On the AWS Glue console, choose Jobs (new) in the navigation pane.
  2. For Create job¸ select Spark script editor.
  3. For Options¸ select Create a new script with boilerplate code.
  4. Choose Create.
  5. Enter the code available in the GitHub repo in the editor.

The sample code keeps appending data in the target location by fetching records from the Kinesis data stream.

  1. Choose the Job details tab in the query editor.
  2. For Name, enter Demo_Job.
  3. For IAM role¸ choose demojobrole.
  4. For Type, choose Spark Streaming.
  5. For Glue Version, choose Glue 3.0.
  6. For Language, choose Python 3.
  7. For Worker type, choose G 0.25X.
  8. Select Automatically scale the number of workers.
  9. For Maximum number of workers, enter 5.
  10. Under Advanced properties, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, choose the connector you created.
  12. For Job parameters, enter the following key pairs (provide your S3 bucket and account ID):
Key Value
--iceberg_job_catalog_warehouse s3://streamingicebergdemo-XX/custdata/
--output_path s3://streamingicebergdemo-XX
--kinesis_arn arn:aws:kinesis:us-east-1:<AWS Account ID>:stream/demo-data-stream
--user-jars-first True

  1. Choose Run to start the AWS Glue streaming job.
  2. To monitor the job, choose Monitoring in the navigation pane.
  3. Select Demo_Job and choose View run details to check the job run details and Amazon CloudWatch logs.

Run GDPR use cases on Athena

In this section, we demonstrate a few use cases that are relevant to GDPR alignment with the user data that’s stored in Iceberg format in the Amazon S3-based data lake as implemented in the previous steps. For this, let’s consider that the following requests are being initiated in the workflow to comply with the regulations:

  • Delete the records for the input customerid (for example, 59289)
  • Update phonenumber for the customerid (for example, 51842)

The IDs used in this example are samples only because they were created through the KDG template used earlier, which creates sample data. You can search for IDs in your implementation by querying through the Athena query editor. The steps remain the same.

Delete data by customer ID

Complete the following steps to fulfill the first use case:

  1. On the Athena console, and make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query using a customer ID and choose Run:
SELECT count(*)
FROM icebergdemodb.customer
WHERE customerid = '59289';

This query gives the count of records for the input customerid before delete.

  1. Enter the following query with the same customer ID and choose Run:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '59289') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN DELETE;

This query deletes the data for the input customerid as per the workflow generated.

  1. Test if there is data with the customer ID using a count query.

The count should be 0.

Update data by customer ID

Complete the following steps to test the second use case:

  1. On the Athena console, make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query with a customer ID and choose Run.
SELECT customerid, phonenumber
FROM icebergdemodb.customer
WHERE customerid = '51936';

This query gives the value for phonenumber before update.

  1. Run the following query to update the required columns:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '51936') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN UPDATE SET phonenumber = '000';

This query updates the data to a dummy value.

  1. Run the SELECT query to check the update.

You can see the data is updated correctly.

Vacuum table

A good practice is to run the VACUUM command periodically on the table because operations like INSERT, UPDATE, DELETE, and MERGE will take place on the Iceberg table. See the following code:

VACUUM icebergdemodb.customer;

Considerations

The following are a few considerations to keep in mind for this implementation:

Clean up

Complete the following steps to clean up the resources you created for this post:

    1. Delete the custdata folder in the S3 bucket.
    2. Delete the CloudFormation stack.
    3. Delete the Kinesis data stream.
    4. Delete the S3 bucket storing the data.
    5. Delete the AWS Glue job and Iceberg connector.
    6. Delete the AWS Glue Data Catalog database and table.
    7. Delete the Athena workgroup.
    8. Delete the IAM roles and policies.

Conclusion

This post explained how you can use the Iceberg table format on Athena to implement GDPR use cases like data deletion and data upserts as required, when streaming data is being generated and ingested through AWS Glue streaming jobs in Amazon S3.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the data operations that Iceberg supports. Refer to the Apache Iceberg documentation for details on various operations.


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Rajdip Chaudhuri is Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer.