All posts by Subhro Bose

Automate schema evolution at scale with Apache Hudi in AWS Glue

Post Syndicated from Subhro Bose original https://aws.amazon.com/blogs/big-data/automate-schema-evolution-at-scale-with-apache-hudi-in-aws-glue/

In the data analytics space, organizations often deal with many tables in different databases and file formats to hold data for different business functions. Business needs often drive table structure, such as schema evolution (the addition of new columns, removal of existing columns, update of column names, and so on) for some of these tables in one business function that requires other business functions to replicate the same. This post focuses on such schema changes in file-based tables and shows how to automatically replicate the schema evolution of structured data from table formats in databases to the tables stored as files in cost-effective way.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In this post, we show how to use Apache Hudi, a self-managing database layer on file-based data lakes, in AWS Glue to automatically represent data in relational form and manage their schema evolution at scale using Amazon Simple Storage Service (Amazon S3), AWS Database Migration Service (AWS DMS), AWS Lambda, AWS Glue, Amazon DynamoDB, Amazon Aurora, and Amazon Athena to automatically identify schema evolution and apply it to manage data load at petabyte scale.

Apache Hudi supports ACID transactions and CRUD operations on a data lake. This lays the foundation of a data lake architecture by enabling transaction support and schema evolution and management, decoupling storage from compute, and ensuring support for accessibility through business intelligence (BI) tools. In this post, we implement an architecture to build a transactional data lake built on the aforementioned Hudi features.

Solution overview

This post assumes a scenario where multiple tables are present in a source database, and we want to replicate any schema changes in any of those tables in Apache Hudi tables in the data lake. It uses the native support for Apache Hudi on AWS Glue for Apache Spark.

In this post, the schema evolution of source tables in the Aurora database is captured via the AWS DMS incremental load or change data capture (CDC) mechanism, and the same schema evolution is replicated in Apache Hudi tables stored in Amazon S3. Apache Hudi tables are discovered by the AWS Glue Data Catalog and queried by Athena. An AWS Glue job, supported by an orchestration pipeline using Lambda and a DynamoDB table, takes care of the automated replication of schema evolution in the Apache Hudi tables.

We use Aurora as a sample data source, but any data source that supports Create, Read, Update, and Delete (CRUD) operations can replace Aurora in your use case.

The following diagram illustrates our solution architecture.

The flow of the solution is as follows:

  1. Aurora, as a sample data source, contains a RDBMS table with multiple rows, and AWS DMS does the full load of that data to an S3 bucket (which we call the raw bucket). We expect that you may have multiple source tables, but for demonstration purposes, we only use one source table in this post.
  2. We trigger a Lambda function with the source table name as an event so that the corresponding parameters of the source table are read from DynamoDB. To schedule this operation for specific time intervals, we schedule Amazon EventBridge to trigger the Lambda with the table name as a parameter.
  3. There are many tables in the source database, and we want to run one AWS Glue job for each source table for simplicity in operations. Because we use each AWS Glue job to update each Apache Hudi table, this post uses a DynamoDB table to hold the configuration parameters used by each AWS Glue job for each Apache Hudi table. The DynamoDB table contains each Apache Hudi table name, corresponding AWS Glue job name, AWS Glue job status, load status (full or delta), partition key, record key, and schema to pass to the corresponding table’s AWS Glue Job. The values in the DynamoDB table are static values.
  4. To trigger each AWS Glue job (10 G.1X DPUs) in parallel to run an Apache Hudi specific code to insert data in the corresponding Hudi tables, Lambda passes each Apache Hudi table specific parameters read from DynamoDB to each AWS Glue job. The source data comes from tables in the Aurora source database via AWS DMS with full load and incremental load or CDC.

Create resources with AWS CloudFormation

We provide an AWS CloudFormation template to create the following resources:

  • Lambda and DynamoDB as the data load management orchestrators
  • S3 buckets for the raw, refined zone, and assets for holding code for schema evolution
  • An AWS Glue job to update the Hudi tables and perform schema evolution, both forward- and backward-compatible

The Aurora table and AWS DMS replication instance is not provisioned via this stack. For instructions to set up Aurora, refer to Creating an Amazon Aurora DB cluster.

Launch the following stack and provide your stack name.

eu-west-1

Schema evolution

To access your Aurora database, refer to How do I connect to my Amazon RDS for MySQL instance by using MySQL Workbench. Then complete the following steps:

  1. Create a table named object following the queries in the Aurora database and change its schema so that we can see the schema evolution is reflected at the data lake level:
create database db;
create table db.object ( 
object_name varchar(255),
object_description varchar(255),
new_column_add varchar(255), 
new_field_1 varchar(255), 
object_id int);
insert into object 
values("obj1","Object-1","","",1);

After you create the stacks, some manual steps are needed to prepare the solution end to end.

  1. Create an AWS DMS instance, AWS DMS endpoints, and AWS DMS task with the following configurations:
    • Add dataFormat as Parquet in the target endpoint.
    • Point the target endpoint of AWS DMS to the raw bucket, which is formatted as raw-bucket-<account_number>-<region_name>, and the folder name should be POC.
  2. Start the AWS DMS task.
  3. Create a test event in the HudiLambda Lambda function with the content of the event JSON as POC.db and save it.
  4. Run the Lambda function.

In this post, the schema evolution is reflected through Hudi Hive sync in AWS Glue. You don’t alter queries separately in the data lake.

Now we complete the following steps to change the schema at the source. Trigger the Lambda function after each step to generate a file in the POC/db/object folder within the raw bucket. AWS DMS almost instantly picks up the schema changes and reports to the raw bucket.

  1. Add a column called test_column to the source table object in your Aurora database:
alter table db.object add column test_column int after object_name;
insert into object 
values("obj2",22,"test-2","","",2);
  1. Rename the column new_field_1 to new_field_2 in the source table object:
alter table db.object change new_field_1 new_field_2 varchar(10);
insert into object 
values("obj3",33,"test-3","","new-3",3);

The column new_field_1 is expected to stay in the Hudi table but without any new values being populated to it anymore.

  1. Delete the column new_field_2 from the source table object:
alter table db.object drop column new_field_2;
insert into object 
values("obj4",44,"test-4","",4);

Similar to the previous operation, the column new_field_2 is expected to stay in the Hudi table but without any new values being populated to it anymore.

If you already have AWS Lake Formation data permissions set up in your account, you may encounter permission issues. In that case, grant full permission (Super) to the default database (before triggering the Lambda function) and all tables in the POC.db database (after the load is complete).

Review the results

When the aforementioned run happens after schema changes, the following results are generated in the refined bucket. We can view the Apache Hudi tables with its contents in Athena. To set up Athena, refer to Getting started.

The table and the database are available in the AWS Glue Data Catalog and ready for browsing the schema.

Before the schema change, the Athena results look like the following screenshot.

After you add the column test_column and insert a value in the test_column field in the object table in the Aurora database, the new column (test_column) is reflected in its corresponding Apache Hudi table in the data lake.

The following screenshot shows the results in Athena.

After you rename the column new_field_1 to new_field_2 and insert a value in the new_field_2 field in the object table, the renamed column (new_field_2) is reflected in its corresponding Apache Hudi table in the data lake, and new_field_1 remains in the schema, having no new value populated to the column.

The following screenshot shows the results in Athena.

After you delete the column new_field_2 in the object table and insert or update any values under any columns in the object table, the deleted column (new_field_2) remains in the corresponding Apache Hudi table schema, having no new value populated to the column.

The following screenshot shows the results in Athena.

Clean up

When you’re done with this solution, delete the sample data in the raw and refined S3 buckets and delete the buckets.

Also, delete the CloudFormation stack to remove all the service resources used in this solution.

Conclusion

This post showed how to implement schema evolution with an open-source solution using Apache Hudi in an AWS environment with an orchestration pipeline.

You can explore the different configurations of AWS Glue to change the AWS Glue job structures and implement it for your data analytics and other use cases.


About the Authors

Subhro Bose is a Senior Data Architect in Emergent Technologies and Intelligence Platform in Amazon. He loves solving science problems with emergent technologies such as AI/ML, big data, quantum, and more to help businesses across different industry verticals succeed within their innovation journey. In his spare time, he enjoys playing table tennis, learn theories of environmental economics and explore the best muffins across the city.

Ketan Karalkar is a Big Data Solutions Consultant at AWS. He has nearly 2 decades of experience helping customers design and build data analytics, and database solutions. He believes in using technology as an enabler to solve real life business problems.

Eva Fang is a Data Scientist within Professional Services in AWS. She is passionate about using the technology to provide value to customers and achieve business outcomes. She is based in London, in her spare time, she likes to watch movies and musicals.

Automate the archival and deletion of sensitive data using Amazon Macie

Post Syndicated from Subhro Bose original https://aws.amazon.com/blogs/big-data/automate-the-archival-and-deletion-of-sensitive-data-using-amazon-macie/

Customers are looking for ways to securely and cost-efficiently manage large volumes of sensitive data archival and deletion in their data lake by following regulations and data protection and privacy laws, such as GDPR, POPIA, and LGPD. This post describes a way to automatically identify sensitive data stored in your data lake within AWS, tag the data according to its sensitivity level, and apply appropriate lifecycle policies in a secured and cost-effective way.

Amazon Macie is a managed data security and data privacy service that uses machine learning (ML) and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service. (Amazon S3). In this post, we show you how to develop a solution using Macie, Amazon Kinesis Data Firehose, Amazon S3, Amazon EventBridge, and AWS Lambda to identify sensitive data across a large number of S3 buckets, tag them, and apply lifecycle policies for transition and deletion.

Solution overview

The following diagram illustrates the architecture of our solution.

The flow of the solution is as follows:

  1. An Amazon S3 bucket contains multiple objects with different sensitivities of data.
  2. A Macie job analyses the S3 bucket to identify the different sensitivities.
    1. An EventBridge rule is triggered for each finding that Macie generates from the job.
    2. The rule copies the results created by the Macie job to a Kinesis Data Firehose delivery stream.
    3. The delivery stream copies the results to an S3 bucket as a JSON file for future audit requirements.
  3. The arrival of the results triggers a Lambda function that parses the sensitivity metadata from the JSON file.
  4. The function tags the objects in the bucket mentioned in Step 1 and creates an S3 Lifecycle policy based on the sensitivity level of each object and overwrites an S3 Lifecycle policy for each existing object.
  5. The S3 Lifecycle policy moves data to different classes and deletes data based on the configured rules. For example, we implement the following rules:
    1. Archive objects with high sensitivity, tagged as High, after 700 days.
    2. Delete objects tagged as High after 3,000 days.

Create resources with AWS CloudFormation

We provide an AWS CloudFormation template to create the following resources:

  • An S3 bucket named archival-blog-<account_number>-<region_name> as a sample subject bucket as described above.
  • An S3 bucket named archival-blog-results-<account_number>-<region_name> to store the results generated by the Macie job.
  • A Firehose delivery stream to send the results of the Macie job to the S3 bucket.
  • An EventBridge rule that matches the incoming event of a result generated by the Macie job and routes the result to the Firehose delivery stream.
  • A Lambda function to apply tags and S3 Lifecycle policies on the data objects of the subject S3 bucket based on the result generated by the Macie job.
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions.

Launch the following stack, providing your stack name:

After the cloud formation stack is deployed, copy  sample_pii.xlsx and recipe.xlsx as sample data for Macie to detect as sensitive data in archival-blog-<account_number>-<region_name>.

Next, we scan the subject S3 bucket for sensitive data to tag and attach the appropriate lifecycle policies.

Configure a Macie job

Macie uses ML and pattern matching to discover and protect your sensitive data in AWS. To configure a Macie job, complete the following steps:

  1. On the Macie console, create a new job by choosing Create Job.
  2. Select the bucket that you want to analyze and choose Next.
  3. Select a schedule if you want to run the job on a schedule, or One-time job if you want to run the job one time.For this post, we select One-time job. We can also choose Scheduled job for periodic jobs in production, but this is out of the scope of this post.
  4. Choose Next.
  5. Enter a name for the job and choose Next.
  6. Review the job details and confirm they’re correct before choosing Submit.

The job immediately starts after you submit it.

Review the results

Whenever the Macie job runs, it generates the following results.

Secondly, the Lambda function tags sensitive object, with Sensitivity : High.

Thirdly, the function creates a corresponding S3 Lifecycle policy.

Clean up

When you’re done with this solution, delete the sample data from the subject S3 bucket, delete all the data objects that are stored as Macie results in the S3 bucket for this post, and then delete the CloudFormation stack to remove all the service resources used in the solution.

Conclusion

In this post, we highlighted how you can use Macie to scan all your data stored on Amazon S3 and how to store your security findings in Amazon S3 using EventBridge and Kinesis Data Firehose. We also explored how you can use Lambda to tag the relevant objects in your subject S3 bucket using the Macie job’s security findings. An S3 Lifecycle transition policy moves tagged objects across different storage classes to help save cost, and an S3 Lifecycle expiration policy deletes objects that have reached the end of their lifecycle. According to the privacy laws like GDPR and POPIA, personal data should be retained as long as the data needs to be retained for legal purposes, or needs to be processed. In this post, we provide a mechanism to allow archival of sensitive data which you might not want to delete immediately, but reduce related storage costs and delete it after a certain period when that data is no longer required. The data archival and deletion periods used above are sample numbers that can be customised within the Lambda function based on requirements. Additionally, please also explore Macie’s different type of findings. You can use these different finding to build capabilities like sending notifications, creating searches in cloud trail S3 object logs for who is accessing specific objects, etc.

If you have any questions, comments, or concerns, please reach out to AWS Support. If you have feedback about this post, submit it in the comments section.


About the Authors

Subhro Bose is a Data Architect in Emergent Technologies and Intelligence Platform in Amazon. He loves working on ways for emergent technologies such as AI/ML, big data, quantum, and more to help businesses across different industry verticals succeed within their innovation journey.

 

 

 

Akshay Chandiramani is Data Analytics Consultant for AWS Professional Services. He is passionate about solving complex big data and MLOps problems for customers to create tangible business outcomes. In his spare time, he enjoys playing snooker and table tennis, and capturing trends on the stock market.

 

 

 

Ikenna Uzoh is a Senior Manager, Business Intelligence (BI) and Analytics at AWS. Before his current role, he was a Senior Big Data and Analytics Architect with AWS Professional Services. He is passionate about building data platforms (data lakes, BI, Machine learning) that help organizations achieve their desired outcomes.