Tag Archives: AWS Glue

Compose your ETL jobs for MongoDB Atlas with AWS Glue

Post Syndicated from Igor Alekseev original https://aws.amazon.com/blogs/big-data/compose-your-etl-jobs-for-mongodb-atlas-with-aws-glue/

In today’s data-driven business environment, organizations face the challenge of efficiently preparing and transforming large amounts of data for analytics and data science purposes. Businesses need to build data warehouses and data lakes based on operational data. This is driven by the need to centralize and integrate data coming from disparate sources.

At the same time, operational data often originates from applications backed by legacy data stores. Modernizing applications requires a microservice architecture, which in turn necessitates the consolidation of data from multiple sources to construct an operational data store. Without modernization, legacy applications may incur increasing maintenance costs. Modernizing applications involves changing the underlying database engine to a modern document-based database like MongoDB.

These two tasks (building data lakes or data warehouses and application modernization) involve data movement, which uses an extract, transform, and load (ETL) process. The ETL job is a key functionality to having a well-structured process in order to succeed.

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. MongoDB Atlas is an integrated suite of cloud database and data services that combines transactional processing, relevance-based search, real-time analytics, and mobile-to-cloud data synchronization in an elegant and integrated architecture.

By using AWS Glue with MongoDB Atlas, organizations can streamline their ETL processes. With its fully managed, scalable, and secure database solution, MongoDB Atlas provides a flexible and reliable environment for storing and managing operational data. Together, AWS Glue ETL and MongoDB Atlas are a powerful solution for organizations looking to optimize how they build data lakes and data warehouses, and to modernize their applications, in order to improve business performance, reduce costs, and drive growth and success.

In this post, we demonstrate how to migrate data from Amazon Simple Storage Service (Amazon S3) buckets to MongoDB Atlas using AWS Glue ETL, and how to extract data from MongoDB Atlas into an Amazon S3-based data lake.

Solution overview

In this post, we explore the following use cases:

  • Extracting data from MongoDB – MongoDB is a popular database used by thousands of customers to store application data at scale. Enterprise customers can centralize and integrate data coming from multiple data stores by building data lakes and data warehouses. This process involves extracting data from the operational data stores. When the data is in one place, customers can quickly use it for business intelligence needs or for ML.
  • Ingesting data into MongoDB – MongoDB also serves as a no-SQL database to store application data and build operational data stores. Modernizing applications often involves migration of the operational store to MongoDB. Customers would need to extract existing data from relational databases or from flat files. Mobile and web apps often require data engineers to build data pipelines to create a single view of data in Atlas while ingesting data from multiple siloed sources. During this migration, they would need to join different databases to create documents. This complex join operation would need significant, one-time compute power. Developers would also need to build this quickly to migrate the data.

AWS Glue comes handy in these cases with the pay-as-you-go model and its ability to run complex transformations across huge datasets. Developers can use AWS Glue Studio to efficiently create such data pipelines.

The following diagram shows the data extraction workflow from MongoDB Atlas into an S3 bucket using the AWS Glue Studio.

Extracting Data from MongoDB Atlas into Amazon S3

In order to implement this architecture, you will need a MongoDB Atlas cluster, an S3 bucket, and an AWS Identity and Access Management (IAM) role for AWS Glue. To configure these resources, refer to the prerequisite steps in the following GitHub repo.

The following figure shows the data load workflow from an S3 bucket into MongoDB Atlas using AWS Glue.

Loading Data from Amazon S3 into MongoDB Atlas

The same prerequisites are needed here: an S3 bucket, IAM role, and a MongoDB Atlas cluster.

Load data from Amazon S3 to MongoDB Atlas using AWS Glue

The following steps describe how to load data from the S3 bucket into MongoDB Atlas using an AWS Glue job. The extraction process from MongoDB Atlas to Amazon S3 is very similar, with the exception of the script being used. We call out the differences between the two processes.

  1. Create a free cluster in MongoDB Atlas.
  2. Upload the sample JSON file to your S3 bucket.
  3. Create a new AWS Glue Studio job with the Spark script editor option.

Glue Studio Job Creation UI

  1. Depending on whether you want to load or extract data from the MongoDB Atlas cluster, enter the load script or extract script in the AWS Glue Studio script editor.

The following screenshot shows a code snippet for loading data into the MongoDB Atlas cluster.

Code snippet for loading data into MongoDB Atlas

The code uses AWS Secrets Manager to retrieve the MongoDB Atlas cluster name, user name, and password. Then, it creates a DynamicFrame for the S3 bucket and file name passed to the script as parameters. The code retrieves the database and collection names from the job parameters configuration. Finally, the code writes the DynamicFrame to the MongoDB Atlas cluster using the retrieved parameters.

  1. Create an IAM role with the permissions as shown in the following screenshot.

For more details, refer to Configure an IAM role for your ETL job.

IAM Role permissions

  1. Give the job a name and supply the IAM role created in the previous step on the Job details tab.
  2. You can leave the rest of the parameters as default, as shown in the following screenshots.
    Job DetailsJob details continued
  3. Next, define the job parameters that the script uses and supply the default values.
    Job input parameters
  4. Save the job and run it.
  5. To confirm a successful run, observe the contents of the MongoDB Atlas database collection if loading the data, or the S3 bucket if you were performing an extract.

The following screenshot shows the results of a successful data load from an Amazon S3 bucket into the MongoDB Atlas cluster. The data is now available for queries in the MongoDB Atlas UI.
Data Loaded into MongoDB Atlas Cluster

  1. To troubleshoot your runs, review the Amazon CloudWatch logs using the link on the job’s Run tab.

The following screenshot shows that the job ran successfully, with additional details such as links to the CloudWatch logs.

Successful job run details

Conclusion

In this post, we described how to extract and ingest data to MongoDB Atlas using AWS Glue.

With AWS Glue ETL jobs, we can now transfer the data from MongoDB Atlas to AWS Glue-compatible sources, and vice versa. You can also extend the solution to build analytics using AWS AI and ML services.

To learn more, refer to the GitHub repository for step-by-step instructions and sample code. You can procure MongoDB Atlas on AWS Marketplace.


About the Authors

Igor Alekseev is a Senior Partner Solution Architect at AWS in Data and Analytics domain. In his role Igor is working with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect he implemented many projects in Big Data domain, including several data lakes in Hadoop ecosystem. As a Data Engineer he was involved in applying AI/ML to fraud detection and office automation.


Babu Srinivasan
is a Senior Partner Solutions Architect at MongoDB. In his current role, he is working with AWS to build the technical integrations and reference architectures for the AWS and MongoDB solutions. He has more than two decades of experience in Database and Cloud technologies . He is passionate about providing technical solutions to customers working with multiple Global System Integrators(GSIs) across multiple geographies.

Monitor and optimize cost on AWS Glue for Apache Spark

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/monitor-optimize-cost-glue-spark/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

One of the most common questions we get from customers is how to effectively monitor and optimize costs on AWS Glue for Spark. The diversity of features and pricing options for AWS Glue offers the flexibility to effectively manage the cost of your data workloads and still keep the performance and capacity as per your business needs. Although the fundamental process of cost optimization for AWS Glue workloads remains the same, you can monitor job runs and analyze the costs and usage to find savings and take action to implement improvements to the code or configurations.

In this post, we demonstrate a tactical approach to help you manage and reduce cost through monitoring and optimization techniques on top of your AWS Glue workloads.

Monitor overall costs on AWS Glue for Apache Spark

AWS Glue for Apache Spark charges an hourly rate in 1-second increments with a minimum of 1 minute based on the number of data processing units (DPUs). Learn more in AWS Glue Pricing. This section describes a way to monitor overall costs on AWS Glue for Apache Spark.

AWS Cost Explorer

In AWS Cost Explorer, you can see overall trends of DPU hours. Complete the following steps:

  1. On the Cost Explorer console, create a new cost and usage report.
  2. For Service, choose Glue.
  3. For Usage type, choose the following options:
    1. Choose <Region>-ETL-DPU-Hour (DPU-Hour) for standard jobs.
    2. Choose <Region>-ETL-Flex-DPU-Hour (DPU-Hour) for Flex jobs.
    3. Choose <Region>-GlueInteractiveSession-DPU-Hour (DPU-Hour) for interactive sessions.
  4. Choose Apply.

Cost Explorer for Glue usage

Learn more in Analyzing your costs with AWS Cost Explorer.

Monitor individual job run costs

This section describes a way to monitor individual job run costs on AWS Glue for Apache Spark. There are two options to achieve this.

AWS Glue Studio Monitoring page

On the Monitoring page in AWS Glue Studio, you can monitor the DPU hours you spent on a specific job run. The following screenshot shows three job runs that processed the same dataset; the first job run spent 0.66 DPU hours, and the second spent 0.44 DPU hours. The third one with Flex spent only 0.33 DPU hours.

Glue Studio Job Run Monitoring

GetJobRun and GetJobRuns APIs

The DPU hour values per job run can be retrieved through AWS APIs.

For auto scaling jobs and Flex jobs, the field DPUSeconds is available in GetJobRun and GetJobRuns API responses:

$ aws glue get-job-run --job-name ghcn --run-id jr_ccf6c31cc32184cea60b63b15c72035e31e62296846bad11cd1894d785f671f4
{
    "JobRun": {
        "Id": "jr_ccf6c31cc32184cea60b63b15c72035e31e62296846bad11cd1894d785f671f4",
        "Attempt": 0,
        "JobName": "ghcn",
        "StartedOn": "2023-02-08T19:14:53.821000+09:00",
        "LastModifiedOn": "2023-02-08T19:19:35.995000+09:00",
        "CompletedOn": "2023-02-08T19:19:35.995000+09:00",
        "JobRunState": "SUCCEEDED",
        "PredecessorRuns": [],
        "AllocatedCapacity": 10,
        "ExecutionTime": 274,
        "Timeout": 2880,
        "MaxCapacity": 10.0,
        "WorkerType": "G.1X",
        "NumberOfWorkers": 10,
        "LogGroupName": "/aws-glue/jobs",
        "GlueVersion": "3.0",
        "ExecutionClass": "FLEX",
        "DPUSeconds": 1137.0
    }
}

The field DPUSeconds returns 1137.0. This means 0.32 DPU hours which can be calculated in 1137.0/(60*60)=0.32.

For the other standard jobs without auto scaling, the field DPUSeconds is not available:

$ aws glue get-job-run --job-name ghcn --run-id jr_10dfa93fcbfdd997dd9492187584b07d305275531ff87b10b47f92c0c3bd6264
{
    "JobRun": {
        "Id": "jr_10dfa93fcbfdd997dd9492187584b07d305275531ff87b10b47f92c0c3bd6264",
        "Attempt": 0,
        "JobName": "ghcn",
        "StartedOn": "2023-02-07T16:38:05.155000+09:00",
        "LastModifiedOn": "2023-02-07T16:40:48.575000+09:00",
        "CompletedOn": "2023-02-07T16:40:48.575000+09:00",
        "JobRunState": "SUCCEEDED",
        "PredecessorRuns": [],
        "AllocatedCapacity": 10,
        "ExecutionTime": 157,
        "Timeout": 2880,
        "MaxCapacity": 10.0,
        "WorkerType": "G.1X",
        "NumberOfWorkers": 10,
        "LogGroupName": "/aws-glue/jobs",
        "GlueVersion": "3.0",
        "ExecutionClass": "STANDARD"
    }
}

For these jobs, you can calculate DPU hours by ExecutionTime*MaxCapacity/(60*60). Then you get 0.44 DPU hour by 157*10/(60*60)=0.44. Note that AWS Glue versions 2.0 and later have a 1-minute minimum billing.

AWS CloudFormation template

Because DPU hours can be retrieved through the GetJobRun and GetJobRuns APIs, you can integrate this with other services like Amazon CloudWatch to monitor trends of consumed DPU hours over time. For example, you can configure an Amazon EventBridge rule to invoke an AWS Lambda function to publish CloudWatch metrics every time AWS Glue jobs finish.

To help you configure that quickly, we provide an AWS CloudFormation template. You can review and customize it to suit your needs. Some of the resources this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. Choose Next.
  5. On the next page, choose Next.
  6. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  7. Choose Create stack.

Stack creation can take up to 3 minutes.

After you complete the stack creation, when AWS Glue jobs finish, the following DPUHours metrics are published under the Glue namespace in CloudWatch:

  • Aggregated metrics – Dimension=[JobType, GlueVersion, ExecutionClass]
  • Per-job metrics – Dimension=[JobName, JobRunId=ALL]
  • Per-job run metrics – Dimension=[JobName, JobRunId]

Aggregated metrics and per-job metrics are shown as in the following screenshot.

CloudWatch DPUHours Metrics

Each datapoint represents DPUHours per individual job run, so valid statistics for the CloudWatch metrics is SUM. With the CloudWatch metrics, you can have a granular view on DPU hours.

Options to optimize cost

This section describes key options to optimize costs on AWS Glue for Apache Spark:

  • Upgrade to the latest version
  • Auto scaling
  • Flex
  • Set the job’s timeout period appropriately
  • Interactive sessions
  • Smaller worker type for streaming jobs

We dive deep to the individual options.

Upgrade to the latest version

Having AWS Glue jobs running on the latest version enables you to take advantage of the latest functionalities and improvements offered by AWS Glue and the upgraded version of the supported engines such as Apache Spark. For example, AWS Glue 4.0 includes the new optimized Apache Spark 3.3.0 runtime and adds support for built-in pandas APIs as well as native support for Apache Hudi, Apache Iceberg, and Delta Lake formats, giving you more options for analyzing and storing your data. It also includes a new highly performant Amazon Redshift connector that is 10 times faster on TPC-DS benchmarking.

Auto scaling

One of the most common challenges to reduce cost is to identify the right amount of resources to run jobs. Users tend to overprovision workers in order to avoid resource-related problems, but part of those DPUs are not used, which increases costs unnecessarily. Starting with AWS Glue version 3.0, AWS Glue auto scaling helps you dynamically scale resources up and down based on the workload, for both batch and streaming jobs. Auto scaling reduces the need to optimize the number of workers to avoid over-provisioning resources for jobs, or paying for idle workers.

To enable auto scaling on AWS Glue Studio, go to the Job Details tab of your AWS Glue job and select Automatically scale number of workers.

Glue Auto Scaling

You can learn more in Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark.

Flex

For non-urgent data integration workloads that don’t require fast job start times or can afford to rerun the jobs in case of a failure, Flex could be a good option. The start times and runtimes of jobs using Flex vary because spare compute resources aren’t always available instantly and may be reclaimed during the run of a job. Flex-based jobs offer the same capabilities, including access to custom connectors, a visual job authoring experience, and a job scheduling system. With the Flex option, you can optimize the costs of your data integration workloads by up to 34%.

To enable Flex on AWS Glue Studio, go to the Job Details tab of your job and select Flex execution.

Glue Flex

You can learn more in Introducing AWS Glue Flex jobs: Cost savings on ETL workloads.

Interactive sessions

One common practice among developers that create AWS Glue jobs is to run the same job several times every time a modification is made to the code. However, this may not be cost-effective depending of the number of workers assigned to the job and the number of times that it’s run. Also, this approach may slow down the development time because you have to wait until every job run is complete. To address this issue, in 2022 we released AWS Glue interactive sessions. This feature let developers process data interactively using a Jupyter-based notebook or IDE of their choice. Sessions start in seconds and have built-in cost management. As with AWS Glue jobs, you pay for only the resources you use. Interactive sessions allow developers to test their code line by line without needing to run the entire job to test any changes made to the code.

Set the job’s timeout period appropriately

Due to configuration issues, script coding errors, or data anomalies, sometimes AWS Glue jobs can take an exceptionally long time or struggle to process the data, and it can cause unexpected charges. AWS Glue gives you the ability to set a timeout value on any jobs. By default, an AWS Glue job is configured with 48 hours as the timeout value, but you can specify any timeout. We recommend identifying the average runtime of your job, and based on that, set an appropriate timeout period. This way, you can control cost per job run, prevent unexpected charges, and detect any problems related to the job earlier.

To change the timeout value on AWS Glue Studio, go to the Job Details tab of your job and enter a value for Job timeout.

Glue job timeout

Interactive sessions also have the same ability to set an idle timeout value on sessions. The default idle timeout value for Spark ETL sessions is 2880 minutes (48 hours). To change the timeout value, you can use %idle_timeout magic.

Smaller worker type for streaming jobs

Processing data in real time is a common use case for customers, but sometimes these streams have sporadic and low data volumes. G.1X and G.2X worker types could be too big for these workloads, especially if we consider streaming jobs may need to run 24/7. To help you reduce costs, in 2022 we released G.025X, a new quarter DPU worker type for streaming ETL jobs. With this new worker type, you can process low data volume streams at one-fourth of the cost.

To select the G.025X worker type on AWS Glue Studio, go to the Job Details tab of your job. For Type, choose Spark Streaming, then choose G 0.25X for Worker type.

Glue smaller worker

You can learn more in Best practices to optimize cost and performance for AWS Glue streaming ETL jobs.

Performance tuning to optimize cost

Performance tuning plays an important role in reducing cost. The first action for performance tuning is to identify the bottlenecks. Without measuring the performance and identifying bottlenecks, it’s not realistic to optimize cost-effectively. CloudWatch metrics provide a simple view for quick analysis, and the Spark UI provides deeper view for performance tuning. It’s highly recommended to enable Spark UI for your jobs and then view the UI to identify the bottleneck.

The following are high-level strategies to optimize costs:

  • Scale cluster capacity
  • Reduce the amount of data scanned
  • Parallelize tasks
  • Optimize shuffles
  • Overcome data skew
  • Accelerate query planning

For this post, we discuss the techniques for reducing the amount of data scanned and parallelizing tasks.

Reduce the amount of data scanned: Enable job bookmarks

AWS Glue job bookmarks are a capability to process data incrementally when running a job multiple times on a scheduled interval. If your use case is an incremental data load, you can enable job bookmarks to avoid a full scan for all job runs and process only the delta from the last job run. This reduces the amount of data scanned and accelerates individual job runs.

Reduce the amount of data scanned: Partition pruning

If your input data is partitioned in advance, you can reduce the amount of data scan by pruning partitions.

For AWS Glue DynamicFrame, set push_down_predicate (and catalogPartitionPredicate), as shown in the following code. Learn more in Managing partitions for ETL output in AWS Glue.

# DynamicFrame
dyf = Glue_context.create_dynamic_frame.from_catalog(
    database=src_database_name,
    table_name=src_table_name,
    push_down_predicate = "year='2023' and month ='03'",
)

For Spark DataFrame (or Spark SQL), set a where or filter clause to prune partitions:

# DataFrame
df = spark.read.format("json").load("s3://<YourBucket>/year=2023/month=03/*/*.gz")
 
# SparkSQL 
df = spark.sql("SELECT * FROM <Table> WHERE year= '2023' and month = '03'")

Parallelize tasks: Parallelize JDBC reads

The number of concurrent reads from the JDBC source is determined by configuration. Note that by default, a single JDBC connection will read all the data from the source through a SELECT query.

Both AWS Glue DynamicFrame and Spark DataFrame support parallelize data scans across multiple tasks by splitting the dataset.

For AWS Glue DynamicFrame, set hashfield or hashexpression and hashpartition. Learn more in Reading from JDBC tables in parallel.

For Spark DataFrame, set numPartitions, partitionColumn, lowerBound, and upperBound. Learn more in JDBC To Other Databases.

Conclusion

In this post, we discussed methodologies for monitoring and optimizing cost on AWS Glue for Apache Spark. With these techniques, you can effectively monitor and optimize costs on AWS Glue for Spark.

If you have comments or feedback, please leave them in the comments.


About the Authors

Leonardo Gómez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

How the BMW Group analyses semiconductor demand with AWS Glue

Post Syndicated from Göksel SARIKAYA original https://aws.amazon.com/blogs/big-data/how-the-bmw-group-analyses-semiconductor-demand-with-aws-glue/

This is a guest post co-written by Maik Leuthold and Nick Harmening from BMW Group.

The BMW Group is headquartered in Munich, Germany, where the company oversees 149,000 employees and manufactures cars and motorcycles in over 30 production sites across 15 countries. This multinational production strategy follows an even more international and extensive supplier network.

Like many automobile companies across the world, the BMW Group has been facing challenges in its supply chain due to the worldwide semiconductor shortage. Creating transparency about BMW Group’s current and future demand of semiconductors is one key strategic aspect to resolve shortages together with suppliers and semiconductor manufacturers. The manufacturers need to know BMW Group’s exact current and future semiconductor volume information, which will effectively help steer the available worldwide supply.

The main requirement is to have an automated, transparent, and long-term semiconductor demand forecast. Additionally, this forecasting system needs to provide data enrichment steps including byproducts, serve as the master data around the semiconductor management, and enable further use cases at the BMW Group.

To enable this use case, we used the BMW Group’s cloud-native data platform called the Cloud Data Hub. In 2019, the BMW Group decided to re-architect and move its on-premises data lake to the AWS Cloud to enable data-driven innovation while scaling with the dynamic needs of the organization. The Cloud Data Hub processes and combines anonymized data from vehicle sensors and other sources across the enterprise to make it easily accessible for internal teams creating customer-facing and internal applications. To learn more about the Cloud Data Hub, refer to BMW Group Uses AWS-Based Data Lake to Unlock the Power of Data.

In this post, we share how the BMW Group analyzes semiconductor demand using AWS Glue.

Logic and systems behind the demand forecast

The first step towards the demand forecast is the identification of semiconductor-relevant components of a vehicle type. Each component is described by a unique part number, which serves as a key in all systems to identify this component. A component can be a headlight or a steering wheel, for example.

For historic reasons, the required data for this aggregation step is siloed and represented differently in diverse systems. Because each source system and data type have its own schema and format, it’s particularly difficult to perform analytics based on this data. Some source systems are already available in the Cloud Data Hub (for example, part master data), therefore it’s straightforward to consume from our AWS account. To access the remaining data sources, we need to build specific ingest jobs that read data from the respective system.

The following diagram illustrates the approach.

The data enrichment starts with an Oracle Database (Software Parts) that contains part numbers that are related to software. This can be the control unit of a headlight or a camera system for automated driving. Because semiconductors are the basis for running software, this database builds the foundation of our data processing.

In the next step, we use REST APIs (Part Relations) to enrich the data with further attributes. This includes how parts are related (for example, a specific control unit that will be installed into a headlight) and over which timespan a part number will be built into a vehicle. The knowledge about the part relations is essential to understand how a specific semiconductor, in this case the control unit, is relevant for a more general part, the headlight. The temporal information about the use of part numbers allows us to filter out outdated part numbers, which will not be used in the future and therefore have no relevance in the forecast.

The data (Part Master Data) can directly be consumed from the Cloud Data Hub. This database includes attributes about the status and material types of a part number. This information is required to filter out part numbers that we gathered in the previous steps but have no relevance for semiconductors. With the information that was gathered from the APIs, this data is also queried to extract further part numbers that weren’t ingested in the previous steps.

After data enrichment and filtering, a third-party system reads the filtered part data and enriches the semiconductor information. Subsequently, it adds the volume information of the components. Finally, it provides the overall semiconductor demand forecast centrally to the Cloud Data Hub.

Applied services

Our solution uses the serverless services AWS Glue and Amazon Simple Storage Service (Amazon S3) to run ETL (extract, transform, and load) workflows without managing an infrastructure. It also reduces the costs by paying only for the time jobs are running. The serverless approach fits our workflow’s schedule very well because we run the workload only once a week.

Because we’re using diverse data source systems as well as complex processing and aggregation, it’s important to decouple ETL jobs. This allows us to process each data source independently. We also split the data transformation into several modules (Data Aggregation, Data Filtering, and Data Preparation) to make the system more transparent and easier to maintain. This approach also helps in case of extending or modifying existing jobs.

Although each module is specific to a data source or a particular data transformation, we utilize reusable blocks inside of every job. This allows us to unify each type of operation and simplifies the procedure of adding new data sources and transformation steps in the future.

In our setup, we follow the security best practice of the least privilege principle, to ensure the information is protected from accidental or unnecessary access. Therefore, each module has AWS Identity and Access Management (IAM) roles with only the necessary permissions, namely access to only data sources and buckets the job deals with. For more information regarding security best practices, refer to Security best practices in IAM.

Solution overview

The following diagram shows the overall workflow where several AWS Glue jobs are interacting with each other sequentially.

As we mentioned earlier, we used the Cloud Data Hub, Oracle DB, and other data sources that we communicate with via the REST API. The first step of the solution is the Data Source Ingest module, which ingests the data from different data sources. For that purpose, AWS Glue jobs read information from different data sources and writes into the S3 source buckets. Ingested data is stored in the encrypted buckets, and keys are managed by AWS Key Management Service (AWS KMS).

After the Data Source Ingest step, intermediate jobs aggregate and enrich the tables with other data sources like components version and categories, model manufacture dates, and so on. Then they write them into the intermediate buckets in the Data Aggregation module, creating comprehensive and abundant data representation. Additionally, according to the business logic workflow, the Data Filtering and Data Preparation modules create the final master data table with only actual and production-relevant information.

The AWS Glue workflow manages all these ingestion jobs and filtering jobs end to end. An AWS Glue workflow schedule is configured weekly to run the workflow on Wednesdays. While the workflow is running, each job writes execution logs (info or error) into Amazon Simple Notification Service (Amazon SNS) and Amazon CloudWatch for monitoring purposes. Amazon SNS forwards the execution results to the monitoring tools, such as Mail, Teams, or Slack channels. In case of any error in the jobs, Amazon SNS also alerts the listeners about the job execution result to take action.

As the last step of the solution, the third-party system reads the master table from the prepared data bucket via Amazon Athena. After further data engineering steps like semiconductor information enrichment and volume information integration, the final master data asset is written into the Cloud Data Hub. With the data now provided in the Cloud Data Hub, other use cases can use this semiconductor master data without building several interfaces to different source systems.

Business outcome

The project results provide the BMW Group a substantial transparency about their semiconductor demand for their entire vehicle portfolio in the present and in the future. The creation of a database with that magnitude enables the BMW Group to establish even further use cases to the benefit of more supply chain transparency and clearer and deeper exchange with first-tier suppliers and semiconductor manufacturers. It helps not only to resolve the current demanding market situation, but also to be more resilient in the future. Therefore, it’s one major step to a digital, transparent supply chain.

Conclusion

This post describes how to analyze semiconductor demand from many data sources with big data jobs in an AWS Glue workflow. A serverless architecture with minimal diversity of services makes the code base and architecture simple to understand and maintain. To learn more about how to use AWS Glue workflows and jobs for serverless orchestration, visit the AWS Glue service page.


About the authors

Maik Leuthold is a Project Lead at the BMW Group for advanced analytics in the business field of supply chain and procurement, and leads the digitalization strategy for the semiconductor management.

Nick Harmening is an IT Project Lead at the BMW Group and an AWS certified Solutions Architect. He builds and operates cloud-native applications with a focus on data engineering and machine learning.

Göksel Sarikaya is a Senior Cloud Application Architect at AWS Professional Services. He enables customers to design scalable, cost-effective, and competitive applications through the innovative production of the AWS platform. He helps them to accelerate customer and partner business outcomes during their digital transformation journey.

Alexander Tselikov is a Data Architect at AWS Professional Services who is passionate about helping customers to build scalable data, analytics and ML solutions to enable timely insights and make critical business decisions.

Rahul Shaurya is a Senior Big Data Architect at Amazon Web Services. He helps and works closely with customers building data platforms and analytical applications on AWS. Outside of work, Rahul loves taking long walks with his dog Barney.

Cross-account integration between SaaS platforms using Amazon AppFlow

Post Syndicated from Ramakant Joshi original https://aws.amazon.com/blogs/big-data/cross-account-integration-between-saas-platforms-using-amazon-appflow/

Implementing an effective data sharing strategy that satisfies compliance and regulatory requirements is complex. Customers often need to share data between disparate software as a service (SaaS) platforms within their organization or across organizations. On many occasions, they need to apply business logic to the data received from the source SaaS platform before pushing it to the target SaaS platform.

Let’s take an example. AnyCompany’s marketing team hosted an event at the Anaheim Convention Center, CA. The marketing team created leads based on the event in Adobe Marketo. An automated process downloaded the leads from Marketo in the marketing AWS account. These leads are then pushed to the sales AWS account. A business process picks up those leads, filters them based on a “Do Not Call” criteria, and creates entries in the Salesforce system. Now, the sales team can pursue those leads and continue to track the opportunities in Salesforce.

In this post, we show how to share your data across SaaS platforms in a cross-account structure using fully managed, low-code AWS services such as Amazon AppFlow, Amazon EventBridge, AWS Step Functions, and AWS Glue.

Solution overview

Considering our example of AnyCompany, let’s look at the data flow. AnyCompany’s Marketo instance is integrated with the producer AWS account. As the leads from Marketo land in the producer AWS account, they’re pushed to the consumer AWS account, which is integrated to Salesforce. Business logic is applied to the leads data in the consumer AWS account, and then the curated data is loaded into Salesforce.

We have used a serverless architecture to implement this use case. The following AWS services are used for data ingestion, processing, and load:

  • Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications like Salesforce, SAP, Marketo, Slack, and ServiceNow, and AWS services like Amazon S3 and Amazon Redshift, in just a few clicks. With AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities like filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow is used to download leads data from Marketo and upload the curated leads data into Salesforce.
  • Amazon EventBridge is a serverless event bus that lets you receive, filter, transform, route, and deliver events. EventBridge is used to track the events like receiving the leads data in the producer or consumer AWS accounts and then triggering a workflow.
  • AWS Step Functions is a visual workflow service that helps developers use AWS services to build distributed applications, automate processes, orchestrate microservices, and create data and machine learning (ML) pipelines. Step Functions is used to orchestrate the data processing.
  • AWS Glue is a serverless data preparation service that makes it easy to run extract, transform, and load (ETL) jobs. An AWS Glue job encapsulates a script that reads, processes, and then writes data to a new schema. This solution uses Python 3.6 AWS Glue jobs for data filtration and processing.
  • Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. Amazon S3 is used to store the leads data.

Let’s review the architecture in detail. The following diagram shows a visual representation of how this integration works.

The following steps outline the process for transferring and processing leads data using Amazon AppFlow, Amazon S3, EventBridge, Step Functions, AWS Glue, and Salesforce:

  1. Amazon AppFlow runs on a daily schedule and retrieves any new leads created within the last 24 hours (incremental changes) from Marketo.
  2. The leads are saved as Parquet format files in an S3 bucket in the producer account.
  3. When the daily flow is complete, Amazon AppFlow emits events to EventBridge.
  4. EventBridge triggers Step Functions.
  5. Step Functions copies the Parquet format files containing the leads from the producer account’s S3 bucket to the consumer account’s S3 bucket.
  6. Upon a successful file transfer, Step Functions publishes an event in the consumer account’s EventBridge.
  7. An EventBridge rule intercepts this event and triggers Step Functions in the consumer account.
  8. Step Functions calls an AWS Glue crawler, which scans the leads Parquet files and creates a table in the AWS Glue Data Catalog.
  9. The AWS Glue job is called, which selects records with the Do Not Call field set to false from the leads files, and creates a new set of curated Parquet files. We have used an AWS Glue job for the ETL pipeline to showcase how you can use purpose-built analytics service for complex ETL needs. However, for simple filtering requirements like Do Not Call, you can use the existing filtering feature of Amazon AppFlow.
  10. Step Functions then calls Amazon AppFlow.
  11. Finally, Amazon AppFlow populates the Salesforce leads based on the data in the curated Parquet files.

We have provided artifacts in this post to deploy the AWS services in your account and try out the solution.

Prerequisites

To follow the deployment walkthrough, you need two AWS accounts, one for the producer and other for the consumer. Use us-east-1 or us-west-2 as your AWS Region.

Consumer account setup:

Stage the data

To prepare the data, complete the following steps:

  1. Download the zipped archive file to use for this solution and unzip the files locally.

The AWS Glue job uses the glue-job.py script to perform ETL and populates the curated table in the Data Catalog.

  1. Create an S3 bucket called consumer-configbucket-<ACCOUNT_ID> via the Amazon S3 console in the consumer account, where ACCOUNT_ID is your AWS account ID.
  2. Upload the script to this location.

Create a connection to Salesforce

Follow the connection setup steps outlined in here. Please make a note of the Salesforce connector name.

Create a connection to Salesforce in the consumer account

Follow the connection setup steps outlined in Create Opportunity Object Flow.

Set up resources with AWS CloudFormation

We provided two AWS CloudFormation templates to create resources: one for the producer account, and one for the consumer account.

Amazon S3 now applies server-side encryption with Amazon S3 managed keys (SSE-S3) as the base level of encryption for every bucket in Amazon S3. Starting January 5, 2023, all new object uploads to Amazon S3 are automatically encrypted at no additional cost and with no impact on performance. We use this default encryption for both producer and consumer S3 buckets. If you choose to bring your own keys with AWS Key Management Service (AWS KMS), we recommend referring to Replicating objects created with server-side encryption (SSE-C, SSE-S3, SSE-KMS) for cross-account replication.

Launch the CloudFormation stack in the consumer account

Let’s start with creating resources in the consumer account. There are a few dependencies on the consumer account resources from the producer account. To launch the CloudFormation stack in the consumer account, complete the following steps:

  1. Sign in to the consumer account’s AWS CloudFormation console in the target Region.
  2. Choose Launch Stack.
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-appflow-consumer.
  5. Enter the parameters for the connector name, object, and producer (source) account ID.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Stack creation takes approximately 5 minutes to complete. It will create the following resources. You can find them on the Outputs tab of the CloudFormation stack.

  • ConsumerS3Bucketconsumer-databucket-<consumer account id>
  • Consumer S3 Target Foldermarketo-leads-source
  • ConsumerEventBusArnarn:aws:events:<region>:<consumer account id>:event-bus/consumer-custom-event-bus
  • ConsumerEventRuleArnarn:aws:events:<region>:<consumer account id>:rule/consumer-custom-event-bus/consumer-custom-event-bus-rule
  • ConsumerStepFunctionarn:aws:states:<region>:<consumer account id>:stateMachine:consumer-state-machine
  • ConsumerGlueCrawlerconsumer-glue-crawler
  • ConsumerGlueJobconsumer-glue-job
  • ConsumerGlueDatabaseconsumer-glue-database
  • ConsumerAppFlowarn:aws:appflow:<region>:<consumer account id>:flow/consumer-appflow

Producer account setup:

Create a connection to Marketo

Follow the connection setup steps outlined in here. Please make a note of the Marketo connector name.

Launch the CloudFormation stack in the producer account

Now let’s create resources in the producer account. Complete the following steps:

  1. Sign in to the producer account’s AWS CloudFormation console in the source Region.
  2. Choose Launch Stack.
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-appflow-producer.
  5. Enter the following parameters and leave the rest as default:
    • AppFlowMarketoConnectorName: name of the Marketo connector, created above
    • ConsumerAccountBucket: consumer-databucket-<consumer account id>
    • ConsumerAccountBucketTargetFolder: marketo-leads-source
    • ConsumerAccountEventBusArn: arn:aws:events:<region>:<consumer account id>:event-bus/consumer-custom-event-bus
    • DefaultEventBusArn: arn:aws:events:<region>:<producer account id>:event-bus/default


  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Stack creation takes approximately 5 minutes to complete. It will create the following resources. You can find them on the Outputs tab of the CloudFormation stack.

  • Producer AppFlowproducer-flow
  • Producer Bucketarn:aws:s3:::producer-bucket.<region>.<producer account id>
  • Producer Flow Completion Rulearn:aws:events:<region>:<producer account id>:rule/producer-appflow-completion-event
  • Producer Step Functionarn:aws:states:<region>:<producer account id>:stateMachine:ProducerStateMachine-xxxx
  • Producer Step Function Rolearn:aws:iam::<producer account id>:role/service-role/producer-stepfunction-role
  1. After successful creation of the resources, go to the consumer account S3 bucket, consumer-databucket-<consumer account id>, and update the bucket policy as follows:
{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "AllowAppFlowDestinationActions",
            "Effect": "Allow",
            "Principal": {"Service": "appflow.amazonaws.com"},
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>",
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>/*"
            ]
        }, {
            "Sid": "Producer-stepfunction-role",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<producer-account-id>:role/service-role/producer-stepfunction-role"
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>",
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>/*"
            ]
        }
    ]
}

Validate the workflow

Let’s walk through the flow:

  1. Review the Marketo and Salesforce connection setup in the producer and consumer account respectively.

In the architecture section, we suggested scheduling the AppFlow (producer-flow) in the producer account. However, for quick testing purposes, we demonstrate how to manually run the flow on demand.

  1. Go to the AppFlow (producer-flow) in the producer account. On the Filters tab of the flow, choose Edit filters.
  2. Choose the Created At date range for which you have data.
  3. Save the range and choose Run flow.
  4. Review the producer S3 bucket.

AppFlow generates the files in the producer-flow prefix within this bucket. The files are temporarily located in the producer S3 bucket under s3://<producer-bucket>.<region>.<account-id>/producer-flow.

  1. Review the EventBridge rule and Step Functions state machine in the producer account.

The Amazon AppFlow job completion triggers an EventBridge rule (arn:aws:events:<region>:<producer account id>:rule/producer-appflow-completion-event, as noted in the Outputs tab of the CloudFromation stack in the Producer Account), which triggers the Step Functions state machine (arn:aws:states:<region>:<producer account id>:stateMachine:ProducerStateMachine-xxxx) in the producer account. The state machine copies the files to the consumer S3 bucket from the producer-flow prefix in the producer S3 bucket. Once file copy is complete, the state machine moves the files from the producer-flow prefix to the archive prefix in the producer S3 bucket. You can find the files in s3://<producer-bucket>.<region>.<account-id>/archive.

  1. Review the consumer S3 bucket.

The Step Functions state machine in the producer account copies the files to the consumer S3 bucket and sends an event to EventBridge in the consumer account. The files are located in the consumer S3 bucket under s3://consumer-databucket-<account-id>/marketo-leads-source/.

  1. Review the EventBridge rule (arn:aws:events:<region>:<consumer account id>:rule/consumer-custom-event-bus/consumer-custom-event-bus-rule) in the consumer account, which should have triggered the Step Function workflow (arn:aws:states:<region>:<consumer account id>:stateMachine:consumer-state-machine).

The AWS Glue crawler (consumer-glue-crawler) runs to update the metadata followed by the AWS Glue job (consumer-glue-job), which curates the data by applying the Do not call filter. The curated files are placed in s3://consumer-databucket-<account-id>/marketo-leads-curated/. After data curation, the flow is started as part of the state machine.

  1. Review the Amazon AppFlow job (arn:aws:appflow:<region>:<consumer account id>:flow/consumer-appflow) run status in the consumer account.

Upon a successful run of the Amazon AppFlow job, the curated data files are moved to the s3://consumer-databucket-<account-id>/marketo-leads-processed/ folder and Salesforce is updated with the leads. Additionally, all the original source files are moved from s3://consumer-databucket-<account-id>/marketo-leads-source/ to s3://consumer-databucket-<account-id>/marketo-leads-archive/.

  1. Review the updated data in Salesforce.

You will see newly created or updated leads created by Amazon AppFlow.

Clean up

To clean up the resources created as part of this post, delete the following resources:

  1. Delete the resources in the producer account:
    • Delete the producer S3 bucket content.
    • Delete the CloudFormation stack.
  2. Delete the resources in the consumer account:
    • Delete the consumer S3 bucket content.
    • Delete the CloudFormation stack.

Summary

In this post, we showed how you can support a cross-account model to exchange data between different partners with different SaaS integrations using Amazon AppFlow. You can expand this idea to support multiple target accounts.

For more information, refer to Simplifying cross-account access with Amazon EventBridge resource policies. To learn more about Amazon AppFlow, visit Amazon AppFlow.


About the authors

Ramakant Joshi is an AWS Solutions Architect, specializing in the analytics and serverless domain. He has a background in software development and hybrid architectures, and is passionate about helping customers modernize their cloud architecture.

Debaprasun Chakraborty is an AWS Solutions Architect, specializing in the analytics domain. He has around 20 years of software development and architecture experience. He is passionate about helping customers in cloud adoption, migration and strategy.

Suraj Subramani Vineet is a Senior Cloud Architect at Amazon Web Services (AWS) Professional Services in Sydney, Australia. He specializes in designing and building scalable and cost-effective data platforms and AI/ML solutions in the cloud. Outside of work, he enjoys playing soccer on weekends.

Build a transactional data lake using Apache Iceberg, AWS Glue, and cross-account data shares using AWS Lake Formation and Amazon Athena

Post Syndicated from Vikram Sahadevan original https://aws.amazon.com/blogs/big-data/build-a-transactional-data-lake-using-apache-iceberg-aws-glue-and-cross-account-data-shares-using-aws-lake-formation-and-amazon-athena/

Building a data lake on Amazon Simple Storage Service (Amazon S3) provides numerous benefits for an organization. It allows you to access diverse data sources, build business intelligence dashboards, build AI and machine learning (ML) models to provide customized customer experiences, and accelerate the curation of new datasets for consumption by adopting a modern data architecture or data mesh architecture.

However, many use cases, like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake, require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite entire datasets as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance and compaction maintenance.

In 2022, we announced that you can enforce fine-grained access control policies using AWS Lake Formation and query data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi, and more using Amazon Athena queries. You get the flexibility to choose the table and file format best suited for your use case and get the benefit of centralized data governance to secure data access when using Athena.

In this post, we show you how to configure Lake Formation using Iceberg table formats. We also explain how to upsert and merge in an S3 data lake using an Iceberg framework and apply Lake Formation access control using Athena.

Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon S3. Iceberg also helps guarantee data correctness under concurrent write scenarios.

Solution overview

To explain this setup, we present the following architecture, which integrates Amazon S3 for the data lake (Iceberg table format), Lake Formation for access control, AWS Glue for ETL (extract, transform, and load), and Athena for querying the latest inventory data from the Iceberg tables using standard SQL.

The solution workflow consists of the following steps, including data ingestion (Steps 1–3), data governance (Step 4), and data access (Step 5):

  1. We use AWS Database Migration Service (AWS DMS) or a similar tool to connect to the data source and move incremental data (CDC) to Amazon S3 in CSV format.
  2. An AWS Glue PySpark job reads the incremental data from the S3 input bucket and performs deduplication of the records.
  3. The job then invokes Iceberg’s MERGE statements to merge the data with the target S3 bucket.
  4. We use the AWS Glue Data Catalog as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema. Lake Formation allows you to centrally manage permissions and access control for Data Catalog resources in your S3 data lake. You can use fine-grained access control in Lake Formation to restrict access to data in query results.
  5. We use Athena integrated with Lake Formation to query data from the Iceberg table using standard SQL and validate table- and column-level access on Iceberg tables.

For this solution, we assume that the raw data files are already available in Amazon S3, and focus on processing the data using AWS Glue with Iceberg table format. We use sample item data that has the following attributes:

  • op – This represents the operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to Amazon S3. Make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.
  • product_id – This is the primary key column in the source data table.
  • category – This column represents the category of an item.
  • product_name – This is the name of the product.
  • quantity_available – This is the quantity available in the inventory. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.
  • last_update_time – This is the time when the item record was updated at the source data.

We demonstrate implementing the solution with the following steps:

  1. Create an S3 bucket for input and output data.
  2. Create input and output tables using Athena.
  3. Insert the data into the Iceberg table from Athena.
  4. Query the Iceberg table using Athena.
  5. Upload incremental (CDC) data for further processing.
  6. Run the AWS Glue job again to process the incremental files.
  7. Query the Iceberg table again using Athena.
  8. Define Lake Formation policies.

Prerequisites

For Athena queries, we need to configure an Athena workgroup with engine version 3 to support Iceberg table format.

To validate cross-account access through Lake Formation for Iceberg table, in this post we used two accounts (primary and secondary).

Now let’s dive into the implementation steps.

Create an S3 bucket for input and output data

Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process them with AWS Glue PySpark code for the output.

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Specify the bucket name asiceberg-blog and leave the remaining fields as default.

S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as<Bucket-Name>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}might help you get a unique name.

  1. On the bucket details page, choose Create folder.
  2. Create two subfolders. For this post, we createiceberg-blog/raw-csv-input andiceberg-blog/iceberg-output.
  3. Upload theLOAD00000001.csvfile into the raw-csv-input folder.

The following screenshot provides a sample of the input dataset.

Create input and output tables using Athena

To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena query editor and run the following queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_lf_db;

As we explain later in this post, it’s essential to record the data locations when incorporating Lake Formation access controls.

-- Create external table in input CSV files. Replace the S3 path with your bucket name
CREATE EXTERNAL TABLE iceberg_lf_db.csv_input(
op string,
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
'areColumnsQuoted'='false',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'typeOfData'='file');

-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
CREATE TABLE iceberg_lf_db.iceberg_table_lf (
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time timestamp)
PARTITIONED BY (category, bucket(16,product_id))
LOCATION 's3://glue-iceberg-demo/iceberg_blog/iceberg-output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912'
);

-- Validate the input data
SELECT * FROM iceberg_lf_db.csv_input;

SELECT * FROM iceberg_lf_db.iceberg_table_lf;

Alternatively, you can use an AWS Glue crawler to create the table definition for the input files.

Insert the data into the Iceberg table from Athena

Optionally, we can insert data into the Iceberg table through Athena using the following code:

insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (200,'Mobile','Mobile brand 1',25,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (201,'Laptop','Laptop brand 1',20,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (202,'Tablet','Kindle',30,cast('2023-01-19 09:51:41' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (203,'Speaker','Alexa',10,cast('2023-01-19 09:51:42' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (204,'Speaker','Alexa',50,cast('2023-01-19 09:51:43' as timestamp));

For this post, we load the data using an AWS Glue job. Complete the following steps to create the job:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. Select Visual with a blank canvas.
  4. Choose Create.
  5. Choose Edit script.
  6. Replace the script with the following script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, max

from pyspark.conf import SparkConf

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
conf = SparkConf()

## spark.sql.catalog.job_catalog.warehouse can be passed as an ## runtime argument with value as the S3 path
## Please make sure to pass runtime argument –
## iceberg_job_catalog_warehouse with value as the S3 path 
conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


## Read Input Table
## glueContext.create_data_frame.from_catalog can be more 
## performant and can be replaced in place of 
## create_dynamic_frame.from_catalog.

IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_lf_db", table_name = "csv_input", transformation_ctx = "IncrementalInputDyF")
IncrementalInputDF = IncrementalInputDyF.toDF()

if not IncrementalInputDF.rdd.isEmpty():
## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation
IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)

# Add new columns to capture OP value and what is the latest timestamp
inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))

# Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output
NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

# Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements
finalInputDF.createOrReplaceTempView("incremental_input_data")
finalInputDF.show()

## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_lf_db.iceberg_table_lf t
USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
""")

job.commit()
  1. On the Job details tab, specify the job name (iceberg-lf).
  2. For IAM Role, assign an AWS Identity and Access Management (IAM) role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  3. For Glue version, choose Glue 4.0 (Glue 3.0 is also supported).
  4. For Language, choose Python 3.
  5. Make sure Job bookmark has the default value of Enable.
  6. For Job parameters, add the following:
    1. Add the key--datalake-formatswith the valueiceberg.
    2. Add the key--iceberg_job_catalog_warehouse with the value as your S3 path (s3://<bucket-name>/<iceberg-warehouse-path>).
  7. Choose Save and then Run, which should write the input data to the Iceberg table with a MERGE statement.

Query the Iceberg table using Athena

After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:

SELECT * FROM iceberg_lf_db.iceberg_table_lf limit 10;

The output of the query should match the input, with one difference: the Iceberg output table doesn’t have theopcolumn.

Upload incremental (CDC) data for further processing

After we process the initial full load file, let’s upload an incremental file.

This file includes updated records on two items.

Run the AWS Glue job again to process incremental files

Because the AWS Glue job has bookmarks enabled, the job picks up the new incremental file and performs a MERGE operation on the Iceberg table.

To run the job again, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Select the job and choose Run.

For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an AWS Glue workflow or via AWS Step Functions (for more information, see Manage AWS Glue Jobs with Step Functions).

Query the Iceberg table using Athena after incremental data processing

When the incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for items 200 and 201.

The following screenshot shows the output.

Define Lake Formation policies

For data governance, we use Lake Formation. Lake Formation is a fully managed service that simplifies data lake setup, supports centralized security management, and provides transactional access on top of your data lake. Moreover, it enables data sharing across accounts and organizations. There are two ways to share data resources in Lake Formation: named resource access control (NRAC) and tag-based access control (TBAC). NRAC uses AWS Resource Access Manager (AWS RAM) to share data resources across accounts using Lake Formation V3. Those are consumed via resource links that are based on created resource shares. Lake Formation tag-based access control (LF-TBAC) is another approach to share data resources in Lake Formation, which defines permissions based on attributes. These attributes are called LF-tags.

In this example, we create databases in the primary account. Our NRAC database is shared with a data domain via AWS RAM. Access to data tables that we register in this database will be handled through NRAC.

Configure access controls in the primary account

In the primary account, complete the following steps to set up access controls using Lake Formation:

  1. On the Lake Formation console, choose Data lake locations in the navigation pane.
  2. Choose Register location.
  3. Update the Iceberg Amazon S3 location path shown in the following screenshot.

Grant access to the database to the secondary account

To grant database access to the external (secondary) account, complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Choose External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the database name.

The first grant should be at database level, and the second grant is at table level.

  1. For Database permissions, specify your permissions (for this post, we select Describe).
  2. Choose Grant.

Now you need to grant permissions at the table level.

  1. Select External accounts and enter the secondary account number.
  2. Select Named data catalog resources.
  3. Verify the table name.
  4. For Table permissions, specify the permissions you want to grant. For this post, we select Select and Describe.
  5. Choose Grant.

If you see the following error, you must revokeIAMAllowedPrincipalsfrom the data lake permissions.

To do so, select IAMAllowedPrincipals and choose Revoke.

Choose Revoke again to confirm.

After you revoke the data permissions, the permissions should appear as shown in the following screenshot.

Add AWS Glue IAM role permissions

Because the IAM principal role was revoked, the AWS Glue IAM role that was used in the AWS Glue job needs to be added exclusively to grant access as shown in the following screenshot.

You need to repeat these steps for the AWS Glue IAM role at table level.

Verify the permissions granted to the AWS Glue IAM role on the Lake Formation console.

Grant access to the Iceberg table to the external account

In the secondary account, complete the following steps to grant access to the Iceberg table to external account.

  1. On the AWS RAM console, choose Resource shares in the navigation pane.
  2. Choose the resource shares invitation sent from the primary account.
  3. Choose Accept resource share.

The resource status should now be active.

Next, you need to create a resource link for the shared Iceberg table and access through Athena.

  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Select the Iceberg table (shared from the primary account).
  3. On the Actions menu, choose Create resource link.
  4. For Resource link name, enter a name (for this post,iceberg_table_lf_demo).
  5. For Database, choose your database and verify the shared table and database are automatically populated.
  6. Choose Create.
  7. Select your table and on the Actions menu, choose View data.

You’re redirected to the Athena console, where you can query the data.

Grant column-based access in the primary account

For column-level restricted access, you need to grant access at the column level on the Iceberg table. Complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Select External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the table name.
  6. For Table permissions, choose the permissions you want to grant. For this post, we select Select.
  7. Under Data permissions, choose Column-based access.
  8. Select Include columns and choose your permission filters (for this post, Category and Quantity_available).
  9. Choose Grant.

Data with restricted columns can now be queried through the Athena console.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. In your secondary account, log in to the Lake Formation console.
  2. Drop the resource share table.
  3. In your primary account, log in to the Lake Formation console.
  4. Revoke the access you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

This post explains how you can use the Iceberg framework with AWS Glue and Lake Formation to define cross-account access controls and query data using Athena. It provides an overview of Iceberg and its features and integration approaches, and explains how you can ingest data, grant cross-account access, and query data through a step-by-step guide.

We hope this gives you a great starting point for using Iceberg to build your data lake platform along with AWS analytics services to implement your solution.


About the Authors

Vikram Sahadevan is a Senior Resident Architect on the AWS Data Lab team. He enjoys efforts that focus around providing prescriptive architectural guidance, sharing best practices, and removing technical roadblocks with joint engineering engagements between customers and AWS technical resources that accelerate data, analytics, artificial intelligence, and machine learning initiatives.

Suvendu Kumar Patra possesses 18 years of experience in infrastructure, database design, and data engineering, and he currently holds the position of Senior Resident Architect at Amazon Web Services. He is a member of the specialized focus group, AWS Data Lab, and his primary duties entail working with executive leadership teams of strategic AWS customers to develop their roadmaps for data, analytics, and AI/ML. Suvendu collaborates closely with customers to implement data engineering, data hub, data lake, data governance, and EDW solutions, as well as enterprise data strategy and data management.

Exploring new ETL and ELT capabilities for Amazon Redshift from the AWS Glue Studio visual editor

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/exploring-new-etl-and-elt-capabilities-for-amazon-redshift-from-the-aws-glue-studio-visual-editor/

In a modern data architecture, unified analytics enable you to access the data you need, whether it’s stored in a data lake or a data warehouse. In particular, we have observed an increasing number of customers who combine and integrate their data into an Amazon Redshift data warehouse to analyze huge data at scale and run complex queries to achieve their business goals.

One of the most common use cases for data preparation on Amazon Redshift is to ingest and transform data from different data stores into an Amazon Redshift data warehouse. This is commonly achieved via AWS Glue, which is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. AWS Glue provides an extensible architecture that enables users with different data processing use cases, and works well with Amazon Redshift. At AWS re:Invent 2022, we announced support for the new Amazon Redshift integration with Apache Spark available in AWS Glue 4.0, which provides enhanced ETL (extract, transform, and load) and ELT capabilities with improved performance.

Today, we are pleased to announce a new and enhanced visual job authoring capabilities for Amazon Redshift ETL and ELT workflows on the AWS Glue Studio visual editor. The new authoring experience gives you the ability to:

  • Get started faster with Amazon Redshift by directly browsing Amazon Redshift schemas and tables from the AWS Glue Studio visual interface
  • Flexible authoring through native Amazon Redshift SQL support as a source or custom preactions and postactions
  • Simplify common data loading operations into Amazon Redshift through new support for INSERT, TRUNCATE, DROP, and MERGE commands

With these enhancements, you can use existing transforms and connectors in AWS Glue Studio to quickly create data pipelines for Amazon Redshift. No-code users can complete end-to-end tasks using only the visual interface, SQL users can reuse their existing Amazon Redshift SQL within AWS Glue, and all users can tune their logic with custom actions on the visual editor.

In this post, we explore the new streamlined user interface and dive deeper into how to use these capabilities. To demonstrate these new capabilities, we showcase the following:

  • Passing a custom SQL JOIN statement to Amazon Redshift
  • Using the results to apply an AWS Glue Studio visual transform
  • Performing an APPEND on the results to load them into a destination table

Set up resources with AWS CloudFormation

To demonstrate the AWS Glue Studio visual editor experience with Amazon Redshift, we provide an AWS CloudFormation template for you to set up baseline resources quickly. The template creates the following resources for you:

  • An Amazon VPC, subnets, route tables, an internet gateway, and NAT gateways
  • An Amazon Redshift cluster
  • An AWS Identity and Access Management (IAM) role associated with the Amazon Redshift cluster
  • An IAM role for running the AWS Glue job
  • An Amazon Simple Storage Service (Amazon S3) bucket to be used as a temporary location for Amazon Redshift ETL
  • An AWS Secrets Manager secret that stores the user name and password for the Amazon Redshift cluster

Note that at the time of writing this post, Amazon Redshift MERGE is in preview, and the cluster created is a preview cluster.

To launch the CloudFormation stack, complete the following steps:

  1. On the AWS CloudFormation console, choose Create stack and then choose With new resources (standard).
  2. For Template source, select Upload a template file, and upload the provided template.
  3. Choose Next.
  4. Enter a name for the CloudFormation stack, then choose Next.
  5. Acknowledge that this stack might create IAM resources for you, then choose Submit.
  6. After the CloudFormation stack is successfully created, follow the steps mentioned at https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-sample-db.html to load sample tickit data into the created Redshift Cluster

Exploring Amazon Redshift reads

In this section, we go over the new read functionality in the AWS Glue Studio visual editor and demonstrate how we can run a custom SQL statement via the new UI.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the Visual with a blank canvas, because we’re authoring a job from scratch, then choose Create.
  3. In the blank canvas, choose the plus sign to add an Amazon Redshift node of type Source.

When you close the node selector, and you should see an Amazon Redshift source node on the canvas along with the data source properties.

You can choose from two methods of accessing your Amazon Redshift data:

  • Direct data connection – This new method allows you to establish a connection to your Amazon Redshift sources without the need to catalog them
  • Glue Data Catalog tables – This method requires you to have already crawled or generated your Amazon Redshift tables in the AWS Glue Data Catalog

For this post, we use the Direct data connection option.

  1. For Redshift access type, select the Direct data connection.
  2. For Redshift connection, choose your AWS Glue Connection redshift-demo-blog-connection created in the CloudFormation stack.

Specifying the connection automatically configures all the network related details along with the name of the database you wish to connect to.

The UI then presents a choice on how you’d like to access the data from within your selected Amazon Redshift cluster’s database:

  • Choose a single table – This option lets you select a single schema, and a single table from your database. You can browse through all of your available schemas and tables right from the AWS Glue Studio visual editor itself, which makes choosing your source table much easier.
  • Enter a custom query If you’re looking to perform your ETL on a subset of data from your Amazon Redshift tables, you can author an Amazon Redshift query from the AWS Glue Studio UI. This query will be passed to the connected Amazon Redshift cluster, and the returned query result will be available in downstream transformations on AWS Glue Studio.

For the purposes of this post, we write our own custom query that joins data from the preloaded event table and venue table.

  1. Select Enter a custom query and enter the following query into the query editor:
select venue.venueid from event, venue where event.venueid = venue.venueid and event.starttime between '2008-01-01 14:00:00' and '2008-01-01 15:00:00' and venue.venueseats = 0

The intent of this query is to gather the venueid of locations that have had an event between 2008-01-01 14:00:00 and 2008-01-01 15:00:00 and have had venueseats = 0. If we run a similar query from the Amazon Redshift Query Editor, we can see that there are actually five such venues within that time frame. We wish to merge this data back into Amazon Redshift without including these rows.

  1. Choose Infer schema, which allows the AWS Glue Studio visual editor to understand the schema from the returned columns from your query.

You can see the schema on the Output schema tab.

  1. Under Performance and security, for S3 staging directory, choose the S3 temporary directory location created by the CloudFormation stack ( RedshiftS3TempPath ).
  2. For IAM role, choose the IAM role specified by RedshiftIamRoleARN in the CloudFormation stack.

Now we’re going to add a transform to drop duplicate rows from our join result. This will ensure that the MERGE operation in the following steps won’t have conflicting keys when performing the operation.

  1. Choose the Drop Duplicates node to view the node properties.
  2. On the Transform tab, for Drop duplicates, select Match specific keys.
  3. For Keys to match rows, choose venueid.

In this section, we defined the steps to read the output of a custom JOIN query. We then dropped the duplicate records from the returned value. In the next section, we explore the write path on the same job.

Exploring Amazon Redshift writes

Now we go over the enhancements for writing to Amazon Redshift as a destination. This section goes over all the simplified options for writing to Amazon Redshift, but highlights the new Amazon Redshift MERGE capabilities for the purposes of this post.

The MERGE operator offers great flexibility for conditionally merging rows from a source into a destination table. MERGE is powerful because it simplifies operations that traditionally were only achievable by using multiple insert, update, or delete statements separately. Within AWS Glue Studio, particularly with the custom MERGE option, you can define a more complex matching condition to handle finding the records to update.

  1. From the canvas page of the job used in the previous section, select Amazon Redshift to add an Amazon Redshift node of type Target.

When you close the selector, you should see your Amazon Redshift target node added on the Amazon Glue Studio canvas, along with possible options.

  1. For Redshift access type, select Direct data connection.

Similar to the Amazon Redshift source node, the Direct data connection method allows you to write directly to your Amazon Redshift tables without needing to have them cataloged within the AWS Glue Data Catalog.

  1. For Redshift connection, choose your AWS Glue connection redshift-demo-blog-connection created in the CloudFormation stack.
  2. For Schema, choose public.
  3. For Table, choose the venue table as the destination Amazon Redshift table where we will store the merged data.
  4. Choose MERGE data into target table.

This selection provides the user with two options:

  • Choose keys and simple actions – This is a user-friendly version of the MERGE operation. You simply specify the matching keys, and choose what happens to the rows that match the key (update them or delete them) or don’t have any matches (insert them).
  • Enter custom MERGE statement – This option provides the most flexibility. You can enter your own custom logic for MERGE.

For this post, we use the simple actions method for performing a MERGE operation.

  1. For Handling of data and target table, select MERGE data into target table, and then select Choose keys and simple actions.
  2. For Matching Keys, select venueid .

This field will become our MERGE condition for checking keys

  1. For When matched, select the Delete record in the table
  2. For When not matched, select Insert source data as a new row into the table

With these selections, we’ve configured the AWS Glue job to run a MERGE statement on Amazon Redshift while inserting our data. Moreover, for performing this MERGE operation, we use the as the key (you can select multiple keys). If there is a key match with the destination table’s record, we delete that record. Otherwise, we insert the record into the destination table.

  1. Navigate to the Job details tab.
  2. For Name, enter a name for the job.
  3. For the IAM Role drop down, select the RedshiftIamRole role that was created via the CloudFormation template.
  4. Choose Save.

  5. Choose Run and wait for the job to finish.

You can track its progress on the Runs tab.

  1. After the run reaches a successful state, navigate back to the Amazon Redshift Query Editor.
  2. Run the same query again to discover that those rows have been deleted in accordance to our MERGE specifications.

In this section, we configured an Amazon Redshift target node to write a MERGE statement to conditionally update records in our destination Amazon Redshift table. We then saved and ran the AWS Glue job, and saw the effect of the MERGE statement on our destination Amazon Redshift table.

Other available write options

In addition to MERGE, the AWS Glue Studio visual editor’s Amazon Redshift destination node also supports a number of other common operations:

  • APPEND – Appending to your target table performs an insert into the selected table without updating any of the existing records (if there are duplicates, both records will be retained). In cases where you want to update existing rows in addition to adding new rows (often referred to an UPSERT operation), you can select the Also update existing records in target table option. Note that both APPEND only and UPSERT (APPEND with UPDATE) are a simpler subset of the MERGE functionality discussed earlier.
  • TRUNCATE – The TRUNCATE option clears all the data in the existing table but retains all the existing table schema, followed by an APPEND of all new data to the empty table. This option is often used when the full dataset needs to be refreshed and downstream services or tools depend on the table schema being consistent. For example, every night an Amazon Redshift table needs to be fully updated with the latest customer information that will be consumed by an Amazon QuickSight dashboard. In this case, the ETL developer would choose TRUNCATE to ensure the data is fully refreshed but the table schema is guaranteed not to change.
  • DROP – This option is used when the full dataset needs to be refreshed and the downstream services or tools that depend on the schema or systems can handle possible schema changes without breaking.

How write operations are being handled on the backend

The Amazon Redshift connector supports two parameters called preactions and postactions. These parameters allow you to run SQL statements that will be passed on to the Amazon Redshift data warehouse before and after the actual write operation is carried out by Spark.

On the Script tab on the AWS Glue Studio page, we can see what SQL statements are being run.

Use a custom implementation for writing data into Amazon Redshift

In the event that the provided presets require more customization, or your use case requires more advanced implementations for writing to Amazon Redshift, AWS Glue Studio also allows you to freely select which preactions and postactions can be run when writing to Amazon Redshift.

To show an example, we create an Amazon Redshift datashare as a preaction, then perform the cleaning up of the same datashare as a postaction via AWS Glue Studio.

NOTE: This section is not executed as part of the above blog and is provided as an example.

  1. Choose the Amazon Redshift data target node.
  2. On the Data target properties tab, expand the Custom Redshift parameters section.
  3. For the parameters, add the following:
    1. Parameter: preactions  with Value BEGIN; CREATE DATASHARE ds1; END
    2. Parameter: postactions with Value BEGIN; DROP DATASHARE ds1; END

As you can see, we can specify multiple Amazon Redshift statements as a part of both the preactions and postactions parameters. Remember that these statements will override any existing preactions or postactions with your specified actions (as you can see in the following generated code).

Cleanup

To avoid additional costs, make sure to delete any unnecessary resources and files:

  • Empty and delete the contents from the S3 temporary bucket
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. Make sure to empty the S3 bucket before you delete the bucket.

Conclusion

In this post, we went over the new AWS Glue Studio visual options for performing reads and writes from Amazon Redshift. We also saw the simplicity with which you can browse your Amazon Redshift tables right from the AWS Glue Studio visual editor UI, and how to run your own custom SQL statements against your Amazon Redshift sources. We then explored how to perform simple ETL loading tasks against Amazon Redshift with just a few clicks, and showcased the new Amazon Redshift MERGE statement.

To dive deeper into the new Amazon Redshift integrations for the AWS Glue Studio visual editor, check out Connecting to Redshift in AWS Glue Studio.


About the Authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team. He works with customers to help improve their big data workloads. In his spare time, he enjoys trying out new food, playing video games, and kickboxing.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Accelerate HiveQL with Oozie to Spark SQL migration on Amazon EMR

Post Syndicated from Vinay Kumar Khambhampati original https://aws.amazon.com/blogs/big-data/accelerate-hiveql-with-oozie-to-spark-sql-migration-on-amazon-emr/

Many customers run big data workloads such as extract, transform, and load (ETL) on Apache Hive to create a data warehouse on Hadoop. Apache Hive has performed pretty well for a long time. But with advancements in infrastructure such as cloud computing and multicore machines with large RAM, Apache Spark started to gain visibility by performing better than Apache Hive.

Customers now want to migrate their Apache Hive workloads to Apache Spark in the cloud to get the benefits of optimized runtime, cost reduction through transient clusters, better scalability by decoupling the storage and compute, and flexibility. However, migration from Apache Hive to Apache Spark needs a lot of manual effort to write migration scripts and maintain different Spark job configurations.

In this post, we walk you through a solution that automates the migration from HiveQL to Spark SQL. The solution was used to migrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a large gaming client. You can also use this solution to develop new jobs with Spark SQL and process them on Amazon EMR. This post assumes that you have a basic understanding of Apache Spark, Hive, and Amazon EMR.

Solution overview

In our example, we use Apache Oozie, which schedules Apache Hive jobs as actions to collect and process data on a daily basis.

We migrate these Oozie workflows with Hive actions by extracting the HQL files, and dynamic and static parameters, and converting them to be Spark compliant. Manual conversion is both time consuming and error prone. To convert the HQL to Spark SQL, you’ll need to sort through existing HQLs, replace the parameters, and change the syntax for a bunch of files.

Instead, we can use automation to speed up the process of migration and reduce heavy lifting tasks, costs, and risks.

We split the solution into two primary components: generating Spark job metadata and running the SQL on Amazon EMR. The first component (metadata setup) consumes existing Hive job configurations and generates metadata such as number of parameters, number of actions (steps), and file formats. The second component consumes the generated metadata from the first component and prepares the run order of Spark SQL within a Spark session. With this solution, we support basic orchestration and scheduling with the help of AWS services like Amazon DynamoDB and Amazon Simple Storage Service (Amazon S3). We can validate the solution by running queries in Amazon Athena.

In the following sections, we walk through these components and how to use these automations in detail.

Generate Spark SQL metadata

Our batch job consists of Hive steps scheduled to run sequentially. For each step, we run HQL scripts that extract, transform, and aggregate input data into one final Hive table, which stores data in HDFS. We use the following Oozie workflow parser script, which takes the input of an existing Hive job and generates configurations artifacts needed for running SQL using PySpark.

Oozie workflow XML parser

We create a Python script to automatically parse the Oozie jobs, including workflow.xml, co-ordinator.xml, job properties, and HQL files. This script can handle many Hive actions in a workflow by organizing the metadata at the step level into separate folders. Each step includes the list of SQLs, SQL paths, and their static parameters, which are input for the solution in the next step.

The process consists of two steps:

  1. The Python parser script takes input of the existing Oozie Hive job and its configuration files.
  2. The script generates a metadata JSON file for each step.

The following diagram outlines these steps and shows sample output.

Prerequisites

You need the following prerequisites:

  • Python 3.8
  • Python packages:
    • sqlparse==0.4.2
    • jproperties==2.1.1
    • defusedxml== 0.7.1

Setup

Complete the following steps:

  1. Install Python 3.8.
  2. Create a virtual environment:
python3 -m venv /path/to/new/virtual/environment
  1. Activate the newly created virtual environment:
source /path/to/new/virtual/environment/bin/activate
  1. Git clone the project:
git clone https://github.com/aws-samples/oozie-job-parser-extract-hive-sql
  1. Install dependent packages:
cd oozie-job-parser-extract-hive-sql
pip install -r requirements.txt

Sample command

We can use the following sample command:

python xml_parser.py --base-folder ./sample_jobs/ --job-name sample_oozie_job_name --job-version V3 --hive-action-version 0.4 --coordinator-action-version 0.4 --workflow-version 0.4 --properties-file-name job.coordinator.properties

The output is as follows:

{'nameNode': 'hdfs://@{{/cluster/${{cluster}}/namenode}}:54310', 'jobTracker': '@{{/cluster/${{cluster}}/jobtracker}}:54311', 'queueName': 'test_queue', 'appName': 'test_app', 'oozie.use.system.libpath': 'true', 'oozie.coord.application.path': '${nameNode}/user/${user.name}/apps/${appName}', 'oozie_app_path': '${oozie.coord.application.path}', 'start': '${{startDate}}', 'end': '${{endDate}}', 'initial_instance': '${{startDate}}', 'job_name': '${appName}', 'timeOut': '-1', 'concurrency': '3', 'execOrder': 'FIFO', 'throttle': '7', 'hiveMetaThrift': '@{{/cluster/${{cluster}}/hivemetastore}}', 'hiveMySQL': '@{{/cluster/${{cluster}}/hivemysql}}', 'zkQuorum': '@{{/cluster/${{cluster}}/zookeeper}}', 'flag': '_done', 'frequency': 'hourly', 'owner': 'who', 'SLA': '2:00', 'job_type': 'coordinator', 'sys_cat_id': '6', 'active': '1', 'data_file': 'hdfs://${nameNode}/hive/warehouse/test_schema/test_dataset', 'upstreamTriggerDir': '/input_trigger/upstream1'}
('./sample_jobs/development/sample_oozie_job_name/step1/step1.json', 'w')

('./sample_jobs/development/sample_oozie_job_name/step2/step2.json', 'w')

Limitations

This method has the following limitations:

  • The Python script parses only HiveQL actions from the Oozie workflow.xml.
  • The Python script generates one file for each SQL statement and uses the sequence ID for file names. It doesn’t name the SQL based on the functionality of the SQL.

Run Spark SQL on Amazon EMR

After we create the SQL metadata files, we use another automation script to run them with Spark SQL on Amazon EMR. This automation script supports custom UDFs by adding JAR files to spark submit. This solution uses DynamoDB for logging the run details of SQLs for support and maintenance.

Architecture overview

The following diagram illustrates the solution architecture.

Prerequisites

You need the following prerequisites:

  • Version:
    • Spark 3.X
    • Python 3.8
    • Amazon EMR 6.1

Setup

Complete the following steps:

  1. Install the AWS Command Line Interface (AWS CLI) on your workspace by following the instructions in Installing or updating the latest version of the AWS CLI. To configure AWS CLI interaction with AWS, refer to Quick setup.
  2. Create two tables in DynamoDB: one to store metadata about jobs and steps, and another to log job runs.
    • Use the following AWS CLI command to create the metadata table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-metadata --attribute-definitions '[ { "AttributeName": "id","AttributeType": "S" } , { "AttributeName": "step_id","AttributeType": "S" }]' --key-schema '[{"AttributeName": "id", "KeyType": "HASH"}, {"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-metadata is successfully created.

The metadata table has the following attributes.

Attributes Type Comments
id String partition_key
step_id String sort_key
step_name String Step description
sql_base_path string Base path
sql_info list List of SQLs in ETL pipeline
. sql_path SQL file name
. sql_active_flag y/n
. sql_load_order Order of SQL
. sql_parameters Parameters in SQL and values
spark_config Map Spark configs
    • Use the following AWS CLI command to create the log table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-pipelinelog --attribute-definitions '[ { "AttributeName":"job_run_id", "AttributeType": "S" } , { "AttributeName":"step_id", "AttributeType": "S" } ]' --key-schema '[{"AttributeName": "job_run_id", "KeyType": "HASH"},{"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-pipelinelog is successfully created.

The log table has the following attributes.

Attributes Type Comments
job_run_id String partition_key
id String sort_key (UUID)
end_time String End time
error_description String Error in case of failure
expire Number Time to Live
sql_seq Number SQL sequence number
start_time String Start time
Status String Status of job
step_id String Job ID SQL belongs

The log table can grow quickly if there are too many jobs or if they are running frequently. We can archive them to Amazon S3 if they are no longer used or use the Time to Live feature of DynamoDB to clean up old records.

  1. Run the first command to set the variable in case you have an existing bucket that can be reused. If not, create a S3 bucket to store the Spark SQL code, which will be run by Amazon EMR.
export s3_bucket_name=unique-code-bucket-name # Change unique-code-bucket-name to a valid bucket name
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1
  1. Enable secure transfer on the bucket:
aws s3api put-bucket-policy --bucket $s3_bucket_name --policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Deny", "Principal": {"AWS": "*"}, "Action": "s3:*", "Resource": ["arn:aws:s3:::unique-code-bucket-name", "arn:aws:s3:::unique-code-bucket-name/*"], "Condition": {"Bool": {"aws:SecureTransport": "false"} } } ] }' # Change unique-code-bucket-name to a valid bucket name

  1. Clone the project to your workspace:
git clone https://github.com/aws-samples/pyspark-sql-framework.git
  1. Create a ZIP file and upload it to the code bucket created earlier:
cd pyspark-sql-framework/code
zip code.zip -r *
aws s3 cp ./code.zip s3://$s3_bucket_name/framework/code.zip
  1. Upload the ETL driver code to the S3 bucket:
cd $OLDPWD/pyspark-sql-framework
aws s3 cp ./code/etl_driver.py s3://$s3_bucket_name/framework/

  1. Upload sample job SQLs to Amazon S3:
aws s3 cp ./sample_oozie_job_name/ s3://$s3_bucket_name/DW/sample_oozie_job_name/ --recursive

  1. Add a sample step (./sample_oozie_job_name/step1/step1.json) to DynamoDB (for more information, refer to Write data to a table using the console or AWS CLI):
{
  "name": "step1.q",
  "step_name": "step1",
  "sql_info": [
    {
      "sql_load_order": 5,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "5.sql"
    },
    {
      "sql_load_order": 10,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "10.sql"
    }
  ],
  "id": "emr_config",
  "step_id": "sample_oozie_job_name#step1",
  "sql_base_path": "sample_oozie_job_name/step1/",
  "spark_config": {
    "spark.sql.parser.quotedRegexColumnNames": "true"
  }
}

  1. In the Athena query editor, create the database base:
create database base;
  1. Copy the sample data files from the repo to Amazon S3:
    1. Copy us_current.csv:
aws s3 cp ./sample_data/us_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_us_current/;

  1. Copy states_current.csv:
aws s3 cp ./sample_data/states_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_states_current/;

  1. To create the source tables in the base database, run the DDLs present in the repo in the Athena query editor:
    1. Run the ./sample_data/ddl/states_current.q file by modifying the S3 path to the bucket you created.
    1. Run the ./sample_data/ddl/us_current.q file by modifying the S3 path to the bucket you created.

The ETL driver file implements the Spark driver logic. It can be invoked locally or on an EMR instance.

  1. Launch an EMR cluster.
    1. Make sure to select Use for Spark table metadata under AWS Glue Data Catalog settings.

  1. Add the following steps to EMR cluster.
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Type=CUSTOM_JAR,Name="boto3",ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[bash,-c,"sudo pip3 install boto3"]'
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Name="sample_oozie_job_name",Jar="command-runner.jar",Args=[spark-submit,--py-files,s3://unique-code-bucket-name-#####/framework/code.zip,s3://unique-code-bucket-name-#####/framework/etl_driver.py,--step_id,sample_oozie_job_name#step1,--job_run_id,sample_oozie_job_name#step1#2022-01-01-12-00-01,  --code_bucket=s3://unique-code-bucket-name-#####/DW,--metadata_table=dw-etl-metadata,--log_table_name=dw-etl-pipelinelog,--sql_parameters,DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#####]' # Change unique-code-bucket-name to a valid bucket name

The following table summarizes the parameters for the spark step.

Step type Spark Application
Name Any Name
Deploy mode Client
Spark-submit options --py-files s3://unique-code-bucket-name-#####/framework/code.zip
Application location s3://unique-code-bucket-name-####/framework/etl_driver.py
Arguments --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#######
Action on failure Continue

The following table summarizes the script arguments.

Script Argument Argument Description
deploy-mode Spark deploy mode. Client/Cluster.
name <jobname>#<stepname> Unique name for the Spark job. This can be used to identify the job on the Spark History UI.
py-files <s3 path for code>/code.zip S3 path for the code.
<s3 path for code>/etl_driver.py S3 path for the driver module. This is the entry point for the solution.
step_id <jobname>#<stepname> Unique name for the step. This refers to the step_id in the metadata entered in DynamoDB.
job_run_id <random UUID> Unique ID to identify the log entries in DynamoDB.
log_table_name <DynamoDB Log table name> DynamoDB table for storing the job run details.
code_bucket <s3 bucket> S3 path for the SQL files that are uploaded in the job setup.
metadata_table <DynamoDB Metadata table name> DynamoDB table for storing the job metadata.
sql_parameters DATE=2022-07-04::HOUR=00 Any additional or dynamic parameters expected by the SQL files.

Validation

After completion of EMR step you should have data on S3 bucket for the table base.states_daily. We can validate the data by querying the table base.states_daily in Athena.

Congratulations, you have converted an Oozie Hive job to Spark and run on Amazon EMR successfully.

Solution highlights

This solution has the following benefits:

  • It avoids boilerplate code for any new pipeline and offers less maintenance of code
  • Onboarding any new pipeline only needs the metadata set up—the DynamoDB entries and SQL to be placed in the S3 path
  • Any common modifications or enhancements can be done at the solution level, which will be reflected across all jobs
  • DynamoDB metadata provides insight into all active jobs and their optimized runtime parameters
  • For each run, this solution persists the SQL start time, end time, and status in a log table to identify issues and analyze runtimes
  • It supports Spark SQL and UDF functionality. Custom UDFs can be provided externally to the spark submit command

Limitations

This method has the following limitations:

  • The solution only supports SQL queries on Spark
  • Each SQL should be a Spark action like insert, create, drop, and so on

In this post, we explained the scenario of migrating an existing Oozie job. We can use the PySpark solution independently for any new development by creating DynamoDB entries and SQL files.

Clean up

Delete all the resources created as part of this solution to avoid ongoing charges for the resources:

  1. Delete the DynamoDB tables:
aws dynamodb delete-table --table-name dw-etl-metadata --region us-east-1
aws dynamodb delete-table --table-name dw-etl-pipelinelog --region us-east-1
  1. Delete the S3 bucket:
aws s3 rm s3://$s3_bucket_name --region us-east-1 --recursive
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Stop the EMR cluster if it wasn’t a transient cluster:
aws emr terminate-clusters --cluster-ids <<cluster id created above>> 

Conclusion

In this post, we presented two automated solutions: one for parsing Oozie workflows and HiveQL files to generate metadata, and a PySpark solution for running SQLs using generated metadata. We successfully implemented these solutions to migrate a Hive workload to EMR Spark for a major gaming customer and achieved about 60% effort reduction.

For a Hive with Oozie to Spark migration, these solutions help complete the code conversion quickly so you can focus on performance benchmark and testing. Developing a new pipeline is also quick—you only need to create SQL logic, test it using Spark (shell or notebook), add metadata to DynamoDB, and test via the PySpark SQL solution. Overall, you can use the solution in this post to accelerate Hive to Spark code migration.


About the authors

Vinay Kumar Khambhampati is a Lead Consultant with the AWS ProServe Team, helping customers with cloud adoption. He is passionate about big data and data analytics.

Sandeep Singh is a Lead Consultant at AWS ProServe, focused on analytics, data lake architecture, and implementation. He helps enterprise customers migrate and modernize their data lake and data warehouse using AWS services.

Amol Guldagad is a Data Analytics Consultant based in India. He has worked with customers in different industries like banking and financial services, healthcare, power and utilities, manufacturing, and retail, helping them solve complex challenges with large-scale data platforms. At AWS ProServe, he helps customers accelerate their journey to the cloud and innovate using AWS analytics services.

Reference guide to build inventory management and forecasting solutions on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-build-inventory-management-and-forecasting-solutions-on-aws/

Inventory management is a critical function for any business that deals with physical products. The primary challenge businesses face with inventory management is balancing the cost of holding inventory with the need to ensure that products are available when customers demand them.

The consequences of poor inventory management can be severe. Overstocking can lead to increased holding costs and waste, while understocking can result in lost sales, reduced customer satisfaction, and damage to the business’s reputation. Inefficient inventory management can also tie up valuable resources, including capital and warehouse space, and can impact profitability.

Forecasting is another critical component of effective inventory management. Accurately predicting demand for products allows businesses to optimize inventory levels, minimize stockouts, and reduce holding costs. However, forecasting can be a complex process, and inaccurate predictions can lead to missed opportunities and lost revenue.

To address these challenges, businesses need an inventory management and forecasting solution that can provide real-time insights into inventory levels, demand trends, and customer behavior. Such a solution should use the latest technologies, including Internet of Things (IoT) sensors, cloud computing, and machine learning (ML), to provide accurate, timely, and actionable data. By implementing such a solution, businesses can improve their inventory management processes, reduce holding costs, increase revenue, and enhance customer satisfaction.

In this post, we discuss how to streamline inventory management forecasting systems with AWS managed analytics, AI/ML, and database services.

Solution overview

In today’s highly competitive business landscape, it’s essential for retailers to optimize their inventory management processes to maximize profitability and improve customer satisfaction. With the proliferation of IoT devices and the abundance of data generated by them, it has become possible to collect real-time data on inventory levels, customer behavior, and other key metrics.

To take advantage of this data and build an effective inventory management and forecasting solution, retailers can use a range of AWS services. By collecting data from store sensors using AWS IoT Core, ingesting it using AWS Lambda to Amazon Aurora Serverless, and transforming it using AWS Glue from a database to an Amazon Simple Storage Service (Amazon S3) data lake, retailers can gain deep insights into their inventory and customer behavior.

With Amazon Athena, retailers can analyze this data to identify trends, patterns, and anomalies, and use Amazon ElastiCache for customer-facing applications with reduced latency. Additionally, by building a point of sales application on Amazon QuickSight, retailers can embed customer 360 views into the application to provide personalized shopping experiences and drive customer loyalty.

Finally, we can use Amazon SageMaker to build forecasting models that can predict inventory demand and optimize stock levels.

With these AWS services, retailers can build an end-to-end inventory management and forecasting solution that provides real-time insights into inventory levels and customer behavior, enabling them to make informed decisions that drive business growth and customer satisfaction.

The following diagram illustrates a sample architecture.

With the appropriate AWS services, your inventory management and forecasting system can have optimized collection, storage, processing, and analysis of data from multiple sources. The solution includes the following components.

Data ingestion and storage

Retail businesses have event-driven data that requires action from downstream processes. It’s critical for an inventory management application to handle the data ingestion and storage for changing demands.

The data ingestion process is typically triggered by an event such as an order being placed, kicking off the inventory management workflow, which requires actions from backend services. Developers are responsible for the operational overhead of trying to maintain the data ingestion load from an event driven-application.

The volume and velocity of data can change in the retail industry each day. Events like Black Friday or a new campaign can create volatile demand in what is required to process and store the inventory data. Serverless services designed to scale to businesses’ needs help reduce the architectural and operational challenges that are driven from high-demand retail applications.

Understanding the scaling challenges that occur when inventory demand spikes, we can deploy Lambda, a serverless, event-driven compute service, to trigger the data ingestion process. As inventory events occur like purchases or returns, Lambda automatically scales compute resources to meet the volume of incoming data.

After Lambda responds to the inventory action request, the updated data is stored in Aurora Serverless. Aurora Serverless is a serverless relational database that is designed to scale to the application’s needs. When peak loads hit during events like Black Friday, Aurora Serverless deploys only the database capacity necessary to meet the workload.

Inventory management applications have ever-changing demands. Deploying serverless services to handle the ingestion and storage of data will not only optimize cost but also reduce the operational overhead for developers, freeing up bandwidth for other critical business needs.

Data performance

Customer-facing applications require low latency to maintain positive user experiences with microsecond response times. ElastiCache, a fully managed, in-memory database, delivers high-performance data retrieval to users.

In-memory caching provided by ElastiCache is used to improve latency and throughput for read-heavy applications that online retailers experience. By storing critical pieces of data in-memory like commonly accessed product information, the application performance improves. Product information is an ideal candidate for a cached store due to data staying relatively the same.

Functionality is often added to retail applications to retrieve trending products. Trending products can be cycled through the cache dependent on customer access patterns. ElastiCache manages the real-time application data caching, allowing your customers to experience microsecond response times while supporting high-throughput handling of hundreds of millions of operations per second.

Data transformation

Data transformation is essential in inventory management and forecasting solutions for both data analysis around sales and inventory, as well as ML for forecasting. This is because raw data from various sources can contain inconsistencies, errors, and missing values that may distort the analysis and forecast results.

In the inventory management and forecasting solution, AWS Glue is recommended for data transformation. The tool addresses issues such as cleaning, restructuring, and consolidating data into a standard format that can be easily analyzed. As a result of the transformation, businesses can obtain a more precise understanding of inventory, sales trends, and customer behavior, influencing data-driven decisions to optimize inventory management and sales strategies. Furthermore, high-quality data is crucial for ML algorithms to make accurate forecasts.

By transforming data, organizations can enhance the accuracy and dependability of their forecasting models, ultimately leading to improved inventory management and cost savings.

Data analysis

Data analysis has become increasingly important for businesses because it allows leaders to make informed operational decisions. However, analyzing large volumes of data can be a time-consuming and resource-intensive task. This is where Athena come in. With Athena, businesses can easily query historical sales and inventory data stored in S3 data lakes and combine it with real-time transactional data from Aurora Serverless databases.

The federated capabilities of Athena allow businesses to generate insights by combining datasets without the need to build ETL (extract, transform, and load) pipelines, saving time and resources. This enables businesses to quickly gain a comprehensive understanding of their inventory and sales trends, which can be used to optimize inventory management and forecasting, ultimately improving operations and increasing profitability.

With Athena’s ease of use and powerful capabilities, businesses can quickly analyze their data and gain valuable insights, driving growth and success without the need for complex ETL pipelines.

Forecasting

Inventory forecasting is an important aspect of inventory management for businesses that deal with physical products. Accurately predicting demand for products can help optimize inventory levels, reduce costs, and improve customer satisfaction. ML can help simplify and improve inventory forecasting by making more accurate predictions based on historical data.

SageMaker is a powerful ML platform that you can use to build, train, and deploy ML models for a wide range of applications, including inventory forecasting. In this solution, we use SageMaker to build and train an ML model for inventory forecasting, covering the basic concepts of ML, the data preparation process, model training and evaluation, and deploying the model for use in a production environment.

The solution also introduces the concept of hierarchical forecasting, which involves generating coherent forecasts that maintain the relationships within the hierarchy or reconciling incoherent forecasts. The workshop provides a step-by-step process for using the training capabilities of SageMaker to carry out hierarchical forecasting using synthetic retail data and the scikit-hts package. The FBProphet model was used along with bottom-up and top-down hierarchical aggregation and disaggregation methods. We used Amazon SageMaker Experiments to train multiple models, and the best model was picked out of the four trained models.

Although the approach was demonstrated on a synthetic retail dataset, you can use the provided code with any time series dataset that exhibits a similar hierarchical structure.

Security and authentication

The solution takes advantage of the scalability, reliability, and security of AWS services to provide a comprehensive inventory management and forecasting solution that can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. By incorporating user authentication with Amazon Cognito and Amazon API Gateway, the solution ensures that the system is secure and accessible only by authorized users.

Next steps

The next step to build an inventory management and forecasting solution on AWS would be to go through the Inventory Management workshop. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end inventory management solution. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that make up an inventory management system.

Conclusion

In conclusion, building an inventory management and forecasting solution on AWS can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. With AWS services like IoT Core, Lambda, Aurora Serverless, AWS Glue, Athena, ElastiCache, QuickSight, SageMaker, and Amazon Cognito, businesses can use scalable, reliable, and secure technologies to collect, store, process, and analyze data from various sources.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, an inventory management and forecasting solution on AWS can provide businesses with the insights and tools they need to make data-driven decisions and stay competitive in a constantly evolving retail landscape.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on databases and enterprise applications, helping customers architect highly available and scalable solutions.

Navnit Shukla is an AWS Specialist Solution Architect, Analytics, and is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Vetri Natarajan is a Specialist Solutions Architect for Amazon QuickSight. Vetri has 15 years of experience implementing enterprise business intelligence (BI) solutions and greenfield data products. Vetri specializes in integration of BI solutions with business applications and enable data-driven decisions.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS platform and specializes in Data Analytics domain.

How Morningstar used tag-based access controls in AWS Lake Formation to manage permissions for an Amazon Redshift data warehouse

Post Syndicated from Don Drake original https://aws.amazon.com/blogs/big-data/how-morningstar-used-tag-based-access-controls-in-aws-lake-formation-to-manage-permissions-for-an-amazon-redshift-data-warehouse/

This post was co-written by Ashish Prabhu, Stephen Johnston, and Colin Ingarfield at Morningstar and Don Drake, at AWS.

With “Empowering Investor Success” as the core motto, Morningstar aims at providing our investors and advisors with the tools and information they need to make informed investment decisions.

In this post, Morningstar’s Data Lake Team Leads discuss how they utilized tag-based access control in their data lake with AWS Lake Formation and enabled similar controls in Amazon Redshift.

The business challenge

At Morningstar, we built a data lake solution that allows our consumers to easily ingest data, make it accessible via the AWS Glue Data Catalog, and grant access to consumers to query the data via Amazon Athena. In this solution, we were required to ensure that the consumers could only query the data to which they had explicit access. To enforce our access permissions, we chose Lake Formation tag-based access control (TBAC). TBAC helps us categorize the data into a simple, broad level or a complex, more granular level using tags and then grant consumers access to those tags based on what group of data they need. Tag-based entitlements allow us to have a flexible and manageable entitlements system that solves our complex entitlements scenarios.

However, our consumers pushed us for better query performance and enhanced analytical capabilities. We realized we needed a data warehouse to cater to all of these consumer requirements, so we evaluated Amazon Redshift. Amazon Redshift provides us with features that we could use to work with our consumers and enable their analytical requirements:

  • Better performance for consumers’ analytical requirements
  • Ability to tune query performance with user-specified sort keys and distribution keys
  • Ability to have different representations of the same data via views and materialized views
  • Consistent query performance regardless of concurrency

Many new Amazon Redshift features helped solve and scale our analytical query requirements, specifically Amazon Redshift Serverless and Amazon Redshift data sharing.

Because our Lake Formation-enforced data lake is a central data repository for all our data, it makes sense for us to flow the data permissions from the data lake into Amazon Redshift. We utilize AWS Identity and Access Management (IAM) authentication and want to centralize the governance of permissions based on IAM roles and groups. For each AWS Glue database and table, we have a corresponding Amazon Redshift schema and table. Our goal was to ensure customers who have access to AWS Glue tables via Lake Formation also have access to the corresponding tables in Amazon Redshift.

However, we faced a problem with user-based entitlements as we moved to Amazon Redshift.

The entitlements problem

Even though we added Amazon Redshift as part of our overall solution, the entitlement requirements and challenges that came with it remained the same for our users consuming via Lake Formation. At the same time, we had to find a way to implement entitlements in our Amazon Redshift data warehouse with the same set of tags that we had already defined in Lake Formation. Amazon Redshift supports resource-based entitlements but doesn’t support tag-based entitlements. The challenge we had to overcome was how to map our existing tag-based entitlements in Lake Formation to the resource-based entitlements in Amazon Redshift.

The data in the AWS Glue Data Catalog needed to be also loaded in the Amazon Redshift data warehouse native tables. This was necessary so that the users get a familiar list of schema and tables that they are accustomed to seeing in the Data Catalog when accessing via Athena. This way, our existing data lake consumers could easily transition to Amazon Redshift.

The following diagram illustrates the structure of the AWS Glue Data Catalog mapped 1:1 with the structure of our Amazon Redshift data warehouse.

Shows mapping of Glue databases and tables to Redshift schemas and tables.

We wanted to utilize the ontology of tags in Lake Formation to also be used on the datasets in Amazon Redshift so that consumers could be granted access to the same datasets in both places. This enabled us to have a single entitlement policy source API that would grant appropriate access to both our Amazon Redshift tables as well as the corresponding Lake Formation tables based on the Lake Formation tag-based policies.

Entitlement Policy Source is used by Lake Formation and Redshift

To solve this problem, we needed to build our own solution to convert the tag-based policies in Lake Formation into grants and revokes in the resource-based entitlements in Amazon Redshift.

Solution overview

To solve this mismatch, we wanted to synchronize our Lake Formation tag ontology and classifications to the Amazon Redshift permission model. To do this, we map Lake Formation tags and grants to Amazon Redshift grants with the following steps:

  1. Map all the resources (databases, schemas, tables, and more) in Lake Formation that are tagged to their equivalent Amazon Redshift tables.
  2. Translate each policy in Lake Formation on a tag expression to a set of Amazon Redshift table grants and revokes.

The net result is that when there is a tag or policy change in Lake Formation, a corresponding set of grants or revokes are made to the equivalent Amazon Redshift tables to keep our entitlements in sync.

Map all tagged resources in Lake Formation to Amazon Redshift equivalents

The tag-based access control of Lake Formation allowed us to apply multiple tags on a single resource (database and table) in the AWS Glue Data Catalog. If visualized in a mapping form, the resource tagging can be displayed as how multiple tags on a single table would be flattened into individual entitlements on Amazon Redshift tables.

Mapping of tags in Lake Formation to Redshift tables

Translate tags to Amazon Redshift grants and revokes

To enable the migration of the tag-based policy enforced in Lake Formation, the permissions can be converted into simple grants and revokes that can be done on a per-group level.

There are two fundamental parts to a tag policy: the principal_id and the tag expression (for example, “Acess Level” = “Public”). Assuming that we have an Amazon Redshift database group for each principal_id, then the resources that represent the tag expression can be permissioned accordingly. We plan on migrating from database groups to database roles in a future implementation.

mapping of tags to Redshift user group

The solution implementation

The implementation of this solution led us to develop two components:

  • The mapper service
  • The Amazon Redshift data configuration

The mapper service can be thought of as a translation service. As the name suggests, it has the core business logic to map the tag and policy information into resource-based grants and revokes in Amazon Redshift. It needs to mimic the behavior of Lake Formation when handling the tag policy translation.

To do this translation, the mapper needs to understand and store the metadata at two levels:

  • Understanding what resource in Amazon Redshift is to be tagged with what value
  • Tracking the grants and revokes already performed so they can be updated with changes in the policy

To do this, we created a config schema in our Amazon Redshift cluster, which currently stores all the configurations.

As part of our implementation, we store the mapped (translated) information in Amazon Redshift. This allows us to incrementally update table grants as Lake Formation tags or policies changed. The following diagram illustrates this schema.

schema of configuration stored in Redshift

Business impact and value

The solution we put together has created key business impacts and values out of the current implementation and allows us greater flexibility in the future.

It allows us to get the data to our users faster with the tag policies applied in Lake Formation and translated directly to permissions in Amazon Redshift with immediate effect. It also allows us to have consistency in permissions applied in both Lake Formation and Amazon Redshift, based on the effective permissions derived from tag policies. And all this happens via a single source that grants and revokes permissions across the board, instead of managing them separately.

If we translate this into the business impact and business value that we generate, the solution improves the time to market of our data, but at the same time provides consistent entitlements across the business-driven categories that we define as tags.

The solution also opens up solutions to add more impact as our product scales both horizontally and vertically. There are potential solutions we could implement in terms of automation, users self-servicing their permissions, auditing, dashboards, and more. As our business scales, we expect to take advantage of these capabilities.

Conclusion

In this post, we shared how Morningstar utilized tag-based access control in our data lake with Lake Formation and enabled similar controls in Amazon Redshift. We developed two components that handle mapping of the tag-based access controls to Amazon Redshift permissions. This solution has improved the time to market for our data and provides consistent entitlements across different business-driven categories.

If you have any questions or comments, please leave them in the comments section.


About the Authors

Ashish Prabhu is a Senior Manager of Software Engineering in Morningstar, Inc. He focuses on the solutioning and delivering the different aspects of Data Lake and Data Warehouse for Morningstar’s Enterprise Data and Platform Team. In his spare time he enjoys playing basketball, painting and spending time with his family.

Stephen Johnston is a Distinguished Software Architect at Morningstar, Inc. His focus is on data lake and data warehousing technologies for Morningstar’s Enterprise Data Platform team.

Colin Ingarfield is a Lead Software Engineer at Morningstar, Inc. Based in Austin, Colin focuses on access control and data entitlements on Morningstar’s growing Data Lake platform.

Don Drake is a Senior Analytics Specialist Solutions Architect at AWS. Based in Chicago, Don helps Financial Services customers migrate workloads to AWS.

Implement column-level encryption to protect sensitive data in Amazon Redshift with AWS Glue and AWS Lambda user-defined functions

Post Syndicated from Aaron Chong original https://aws.amazon.com/blogs/big-data/implement-column-level-encryption-to-protect-sensitive-data-in-amazon-redshift-with-aws-glue-and-aws-lambda-user-defined-functions/

Amazon Redshift is a massively parallel processing (MPP), fully managed petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using existing business intelligence tools.

When businesses are modernizing their data warehousing solutions to Amazon Redshift, implementing additional data protection mechanisms for sensitive data, such as personally identifiable information (PII) or protected health information (PHI), is a common requirement, especially for those in highly regulated industries with strict data security and privacy mandates. Amazon Redshift provides role-based access control, row-level security, column-level security, and dynamic data masking, along with other database security features to enable organizations to enforce fine-grained data security.

Security-sensitive applications often require column-level (or field-level) encryption to enforce fine-grained protection of sensitive data on top of the default server-side encryption (namely data encryption at rest). In other words, sensitive data should be always encrypted on disk and remain encrypted in memory, until users with proper permissions request to decrypt the data. Column-level encryption provides an additional layer of security to protect your sensitive data throughout system processing so that only certain users or applications can access it. This encryption ensures that only authorized principals that need the data, and have the required credentials to decrypt it, are able to do so.

In this post, we demonstrate how you can implement your own column-level encryption mechanism in Amazon Redshift using AWS Glue to encrypt sensitive data before loading data into Amazon Redshift, and using AWS Lambda as a user-defined function (UDF) in Amazon Redshift to decrypt the data using standard SQL statements. Lambda UDFs can be written in any of the programming languages supported by Lambda, such as Java, Go, PowerShell, Node.js, C#, Python, Ruby, or a custom runtime. You can use Lambda UDFs in any SQL statement such as SELECT, UPDATE, INSERT, or DELETE, and in any clause of the SQL statements where scalar functions are allowed.

Solution overview

The following diagram describes the solution architecture.

Architecture Diagram

To illustrate how to set up this architecture, we walk you through the following steps:

  1. We upload a sample data file containing synthetic PII data to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. A sample 256-bit data encryption key is generated and securely stored using AWS Secrets Manager.
  3. An AWS Glue job reads the data file from the S3 bucket, retrieves the data encryption key from Secrets Manager, performs data encryption for the PII columns, and loads the processed dataset into an Amazon Redshift table.
  4. We create a Lambda function to reference the same data encryption key from Secrets Manager, and implement data decryption logic for the received payload data.
  5. The Lambda function is registered as a Lambda UDF with a proper AWS Identity and Access Management (IAM) role that the Amazon Redshift cluster is authorized to assume.
  6. We can validate the data decryption functionality by issuing sample queries using Amazon Redshift Query Editor v2.0. You may optionally choose to test it with your own SQL client or business intelligence tools.

Prerequisites

To deploy the solution, make sure to complete the following prerequisites:

  • Have an AWS account. For this post, you configure the required AWS resources using AWS CloudFormation in the us-east-2 Region.
  • Have an IAM user with permissions to manage AWS resources including Amazon S3, AWS Glue, Amazon Redshift, Secrets Manager, Lambda, and AWS Cloud9.

Deploy the solution using AWS CloudFormation

Provision the required AWS resources using a CloudFormation template by completing the following steps:

  1. Sign in to your AWS account.
  2. Choose Launch Stack:
    Launch Button
  3. Navigate to an AWS Region (for example, us-east-2).
  4. For Stack name, enter a name for the stack or leave as default (aws-blog-redshift-column-level-encryption).
  5. For RedshiftMasterUsername, enter a user name for the admin user account of the Amazon Redshift cluster or leave as default (master).
  6. For RedshiftMasterUserPassword, enter a strong password for the admin user account of the Amazon Redshift cluster.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create stack.
    Create CloudFormation stack

The CloudFormation stack creation process takes around 5–10 minutes to complete.

  1. When the stack creation is complete, on the stack Outputs tab, record the values of the following:
    1. AWSCloud9IDE
    2. AmazonS3BucketForDataUpload
    3. IAMRoleForRedshiftLambdaUDF
    4. LambdaFunctionName

CloudFormation stack output

Upload the sample data file to Amazon S3

To test the column-level encryption capability, you can download the sample synthetic data generated by Mockaroo. The sample dataset contains synthetic PII and sensitive fields such as phone number, email address, and credit card number. In this post, we demonstrate how to encrypt the credit card number field, but you can apply the same method to other PII fields according to your own requirements.

Sample synthetic data

An AWS Cloud9 instance is provisioned for you during the CloudFormation stack setup. You may access the instance from the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.

CloudFormation stack output for AWSCloud9IDE

On the AWS Cloud9 terminal, copy the sample dataset to your S3 bucket by running the following command:

S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2274/pii-sample-dataset.csv s3://$S3_BUCKET/

Upload sample dataset to S3

Generate a secret and secure it using Secrets Manager

We generate a 256-bit secret to be used as the data encryption key. Complete the following steps:

  1. Create a new file in the AWS Cloud9 environment.
    Create new file in Cloud9
  2. Enter the following code snippet. We use the cryptography package to create a secret, and use the AWS SDK for Python (Boto3) to securely store the secret value with Secrets Manager:
    from cryptography.fernet import Fernet
    import boto3
    import base64
    
    key = Fernet.generate_key()
    client = boto3.client('secretsmanager')
    
    response = client.create_secret(
        Name='data-encryption-key',
        SecretBinary=base64.urlsafe_b64decode(key)
    )
    
    print(response['ARN'])

  3. Save the file with the file name generate_secret.py (or any desired name ending with .py).
    Save file in Cloud9
  4. Install the required packages by running the following pip install command in the terminal:
    pip install --user boto3
    pip install --user cryptography

  5. Run the Python script via the following command to generate the secret:
    python generate_secret.py

    Run Python script

Create a target table in Amazon Redshift

A single-node Amazon Redshift cluster is provisioned for you during the CloudFormation stack setup. To create the target table for storing the dataset with encrypted PII columns, complete the following steps:

  1. On the Amazon Redshift console, navigate to the list of provisioned clusters, and choose your cluster.
    Amazon Redshift console
  2. To connect to the cluster, on the Query data drop-down menu, choose Query in query editor v2.
    Connect with Query Editor v2
  3. If this is the first time you’re using the Amazon Redshift Query Editor V2, accept the default setting by choosing Configure account.
    Configure account
  4. To connect to the cluster, choose the cluster name.
    Connect to Amazon Redshift cluster
  5. For Database, enter demodb.
  6. For User name, enter master.
  7. For Password, enter your password.

You may need to change the user name and password according to your CloudFormation settings.

  1. Choose Create connection.
    Create Amazon Redshift connection
  2. In the query editor, run the following DDL command to create a table named pii_table:
    CREATE TABLE pii_table(
      id BIGINT,
      full_name VARCHAR(50),
      gender VARCHAR(10),
      job_title VARCHAR(50),
      spoken_language VARCHAR(50),
      contact_phone_number VARCHAR(20),
      email_address VARCHAR(50),
      registered_credit_card VARCHAR(50)
    );

We recommend using the smallest possible column size as a best practice, and you may need to modify these table definitions per your specific use case. Creating columns much larger than necessary will have an impact on the size of data tables and affect query performance.

Create Amazon Redshift table

Create the source and destination Data Catalog tables in AWS Glue

The CloudFormation stack provisioned two AWS Glue data crawlers: one for the Amazon S3 data source and one for the Amazon Redshift data source. To run the crawlers, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
    AWS Glue Crawlers
  2. Select the crawler named glue-s3-crawler, then choose Run crawler to trigger the crawler job.
    Run Amazon S3 crawler job
  3. Select the crawler named glue-redshift-crawler, then choose Run crawler.
    Run Amazon Redshift crawler job

When the crawlers are complete, navigate to the Tables page to verify your results. You should see two tables registered under the demodb database.

AWS Glue database tables

Author an AWS Glue ETL job to perform data encryption

An AWS Glue job is provisioned for you as part of the CloudFormation stack setup, but the extract, transform, and load (ETL) script has not been created. We create and upload the ETL script to the /glue-script folder under the provisioned S3 bucket in order to run the AWS Glue job.

  1. Return to your AWS Cloud9 environment either via the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.
    CloudFormation stack output for AWSCloud9IDE

We use the Miscreant package for implementing a deterministic encryption using the AES-SIV encryption algorithm, which means that for any given plain text value, the generated encrypted value will be always the same. The benefit of using this encryption approach is to allow for point lookups, equality joins, grouping, and indexing on encrypted columns. However, you should also be aware of the potential security implication when applying deterministic encryption to low-cardinality data, such as gender, boolean values, and status flags.

  1. Create a new file in the AWS Cloud9 environment and enter the following code snippet:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrameCollection
    from awsglue.dynamicframe import DynamicFrame
    
    import boto3
    import base64
    from miscreant.aes.siv import SIV
    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import StringType
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME", "SecretName", "InputTable"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # retrieve the data encryption key from Secrets Manager
    secret_name = args["SecretName"]
    
    sm_client = boto3.client('secretsmanager')
    get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
    data_encryption_key = get_secret_value_response['SecretBinary']
    siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic
    
    # define the data encryption function
    def pii_encrypt(value):
        if value is None:
            value = ""
        ciphertext = siv.seal(value.encode())
        return base64.b64encode(ciphertext).decode('utf-8')
    
    # register the data encryption function as Spark SQL UDF   
    udf_pii_encrypt = udf(lambda z: pii_encrypt(z), StringType())
    
    # define the Glue Custom Transform function
    def Encrypt_PII (glueContext, dfc) -> DynamicFrameCollection:
        newdf = dfc.select(list(dfc.keys())[0]).toDF()
        
        # PII fields to be encrypted
        pii_col_list = ["registered_credit_card"]
    
        for pii_col_name in pii_col_list:
            newdf = newdf.withColumn(pii_col_name, udf_pii_encrypt(col(pii_col_name)))
    
        encrypteddyc = DynamicFrame.fromDF(newdf, glueContext, "encrypted_data")
        return (DynamicFrameCollection({"CustomTransform0": encrypteddyc}, glueContext))
    
    # Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="demodb",
        table_name=args["InputTable"],
        transformation_ctx="S3bucket_node1",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=[
            ("id", "long", "id", "long"),
            ("full_name", "string", "full_name", "string"),
            ("gender", "string", "gender", "string"),
            ("job_title", "string", "job_title", "string"),
            ("spoken_language", "string", "spoken_language", "string"),
            ("contact_phone_number", "string", "contact_phone_number", "string"),
            ("email_address", "string", "email_address", "string"),
            ("registered_credit_card", "long", "registered_credit_card", "string"),
        ],
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Custom Transform
    Customtransform_node = Encrypt_PII(glueContext, DynamicFrameCollection({"ApplyMapping_node2": ApplyMapping_node2}, glueContext))
    
    # Script generated for node Redshift Cluster
    RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
        frame=Customtransform_node,
        database="demodb",
        table_name="demodb_public_pii_table",
        redshift_tmp_dir=args["TempDir"],
        transformation_ctx="RedshiftCluster_node3",
    )
    
    job.commit()

  2. Save the script with the file name pii-data-encryption.py.
    Save file in Cloud9
  3. Copy the script to the desired S3 bucket location by running the following command:
    S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
    aws s3 cp pii-data-encryption.py s3://$S3_BUCKET/glue-script/pii-data-encryption.py

    Upload AWS Glue script to S3

  4. To verify the script is uploaded successfully, navigate to the Jobs page on the AWS Glue console.You should be able to find a job named pii-data-encryption-job.
    AWS Glue console
  5. Choose Run to trigger the AWS Glue job.It will first read the source data from the S3 bucket registered in the AWS Glue Data Catalog, then apply column mappings to transform data into the expected data types, followed by performing PII fields encryption, and finally loading the encrypted data into the target Redshift table. The whole process should be completed within 5 minutes for this sample dataset.AWS Glue job scriptYou can switch to the Runs tab to monitor the job status.
    Monitor AWS Glue job

Configure a Lambda function to perform data decryption

A Lambda function with the data decryption logic is deployed for you during the CloudFormation stack setup. You can find the function on the Lambda console.

AWS Lambda console

The following is the Python code used in the Lambda function:

import boto3
import os
import json
import base64
import logging
from miscreant.aes.siv import SIV

logger = logging.getLogger()
logger.setLevel(logging.INFO)

secret_name = os.environ['DATA_ENCRYPT_KEY']

sm_client = boto3.client('secretsmanager')
get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
data_encryption_key = get_secret_value_response['SecretBinary']

siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic

# define lambda function logic
def lambda_handler(event, context):
    ret = dict()
    res = []
    for argument in event['arguments']:
        encrypted_value = argument[0]
        try:
            de_val = siv.open(base64.b64decode(encrypted_value)) # perform decryption
        except:
            de_val = encrypted_value
            logger.warning('Decryption for value failed: ' + str(encrypted_value)) 
        res.append(json.dumps(de_val.decode('utf-8')))

    ret['success'] = True
    ret['results'] = res

    return json.dumps(ret) # return decrypted results

If you want to deploy the Lambda function on your own, make sure to include the Miscreant package in your deployment package.

Register a Lambda UDF in Amazon Redshift

You can create Lambda UDFs that use custom functions defined in Lambda as part of your SQL queries. Lambda UDFs are managed in Lambda, and you can control the access privileges to invoke these UDFs in Amazon Redshift.

  1. Navigate back to the Amazon Redshift Query Editor V2 to register the Lambda UDF.
  2. Use the CREATE EXTERNAL FUNCTION command and provide an IAM role that the Amazon Redshift cluster is authorized to assume and make calls to Lambda:
    CREATE OR REPLACE EXTERNAL FUNCTION pii_decrypt (value varchar(max))
    RETURNS varchar STABLE
    LAMBDA '<--Replace-with-your-lambda-function-name-->'
    IAM_ROLE '<--Replace-with-your-redshift-lambda-iam-role-arn-->';

You can find the Lambda name and Amazon Redshift IAM role on the CloudFormation stack Outputs tab:

  • LambdaFunctionName
  • IAMRoleForRedshiftLambdaUDF

CloudFormation stack output
Create External Function in Amazon Redshift

Validate the column-level encryption functionality in Amazon Redshift

By default, permission to run new Lambda UDFs is granted to PUBLIC. To restrict usage of the newly created UDF, revoke the permission from PUBLIC and then grant the privilege to specific users or groups. To learn more about Lambda UDF security and privileges, see Managing Lambda UDF security and privileges.

You must be a superuser or have the sys:secadmin role to run the following SQL statements:

GRANT SELECT ON "demodb"."public"."pii_table" TO PUBLIC;
CREATE USER regular_user WITH PASSWORD '1234Test!';
CREATE USER privileged_user WITH PASSWORD '1234Test!';
REVOKE EXECUTE ON FUNCTION pii_decrypt(varchar) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pii_decrypt(varchar) TO privileged_user;

First, we run a SELECT statement to verify that our highly sensitive data field, in this case the registered_credit_card column, is now encrypted in the Amazon Redshift table:

SELECT * FROM "demodb"."public"."pii_table";

Select statement

For regular database users who have not been granted the permission to use the Lambda UDF, they will see a permission denied error when they try to use the pii_decrypt() function:

SET SESSION AUTHORIZATION regular_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

Permission denied

For privileged database users who have been granted the permission to use the Lambda UDF for decrypting the data, they can issue a SQL statement using the pii_decrypt() function:

SET SESSION AUTHORIZATION privileged_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

The original registered_credit_card values can be successfully retrieved, as shown in the decrypted_credit_card column.

Decrypted results

Cleaning up

To avoid incurring future charges, make sure to clean up all the AWS resources that you created as part of this post.

You can delete the CloudFormation stack on the AWS CloudFormation console or via the AWS Command Line Interface (AWS CLI). The default stack name is aws-blog-redshift-column-level-encryption.

Conclusion

In this post, we demonstrated how to implement a custom column-level encryption solution for Amazon Redshift, which provides an additional layer of protection for sensitive data stored on the cloud data warehouse. The CloudFormation template gives you an easy way to set up the data pipeline, which you can further customize for your specific business scenarios. You can also modify the AWS Glue ETL code to encrypt multiple data fields at the same time, and to use different data encryption keys for different columns for enhanced data security. With this solution, you can limit the occasions where human actors can access sensitive data stored in plain text on the data warehouse.

You can learn more about this solution and the source code by visiting the GitHub repository. To learn more about how to use Amazon Redshift UDFs to solve different business problems, refer to Example uses of user-defined functions (UDFs) and Amazon Redshift UDFs.


About the Author

Aaron ChongAaron Chong is an Enterprise Solutions Architect at Amazon Web Services Hong Kong. He specializes in the data analytics domain, and works with a wide range of customers to build big data analytics platforms, modernize data engineering practices, and advocate AI/ML democratization.

Implement slowly changing dimensions in a data lake using AWS Glue and Delta

Post Syndicated from Nith Govindasivan original https://aws.amazon.com/blogs/big-data/implement-slowly-changing-dimensions-in-a-data-lake-using-aws-glue-and-delta/

In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.

In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:

  • Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
  • Use consumption tools such as Amazon Athena to query historical records seamlessly

Solution overview

This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:

  • The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
  • The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
  • Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer

The architecture is implemented as follows:

  • Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
  • An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
  • The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a delete_flag value
  • An AWS Glue crawler catalogs the data, which can be queried by Athena

The following diagram illustrates our architecture.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:

  • Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
  • A Lambda function to generate sample records
  • An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:

  1. Enter a stack name.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake resources – The S3 buckets scd-blog-landing-xxxx and scd-blog-processed-xxxx (referred to as scd-blog-landing and scd-blog-processed in the subsequent sections in this post)
  • Sample records generator Lambda functionSampleDataGenaratorLambda-<CloudFormation Stack Name> (referred to as SampleDataGeneratorLambda)
  • AWS Glue Data Catalog databasedeltalake_xxxxxx (referred to as deltalake)
  • AWS Glue Delta job<CloudFormation-Stack-Name>-src-to-processed (referred to as src-to-processed)

Note that deploying the CloudFormation stack in your account incurs AWS usage charges.

Test SCD Type 2 implementation

With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:

  • Run an AWS Glue job for the initial load
  • Simulate a scenario where there are no changes to the source
  • Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
  • Simulate a scenario where the deleted record comes back as a new insert

Generate a sample employee dataset

To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.

Open the function and configure a test event, with the default hello-world template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.

Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.

When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.

Run the AWS Glue job

Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737\nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717\nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas Springs\nDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011\nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson Tunnel\nMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748\nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert Views\nWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813\nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537\nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189\nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229\nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997\nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050\nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks Estates\nEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500\nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004\nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah Creek\nWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659\nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930\nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732\nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049\nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297\nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes Plain\nHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.

To run the job, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job src-to-processed.
  3. On the Runs tab, choose Run.

When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.

When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.

  1. Choose Crawlers in the navigation pane.
  2. Choose Create crawler.

  1. Name your crawler delta-lake-crawler, then choose Next.

  1. Select Not yet for data already mapped to AWS Glue tables.
  2. Choose Add a data source.

  1. On the Data source drop-down menu, choose Delta Lake.
  2. Enter the path to the Delta table.
  3. Select Create Native tables.
  4. Choose Add a Delta Lake data source.

  1. Choose Next.

  1. Choose the role that was created by the CloudFormation template, then choose Next.

  1. Choose the database that was created by the CloudFormation template, then choose Next.

  1. Choose Create crawler.

  1. Select your crawler and choose Run.

Query the data

After the crawler is complete, you can see the table it created.

To query the data, complete the following steps:

  1. Choose the employee table and on the Actions menu, choose View data.

You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.

  1. Under Administration in the navigation pane, choose Workgroups.

  1. Choose Create workgroup.

  1. Provide a name for the workgroup, such as DeltaWorkgroup.
  2. Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.

  1. Choose Create workgroup.

  1. After you create the workgroup, select the workgroup (DeltaWorkgroup) on the drop-down menu in the Athena query editor.

  1. Run the following query on the employee table:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation outputs before running the above query.

You can observe that the employee table has 25 records. The following screenshot shows the total employee records with some sample records.

The Delta table is stored with an emp_key, which is unique to each and every change and is used to track the changes. The emp_key is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id.

The emp_key is created using the SHA256 hashing algorithm, as shown in the following code:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"),
            col("phone_number"), col("isContractor")), 256))

Perform inserts, updates, and deletes

Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select query in the Athena query editor and confirm that there are still 25 active records with the following values:

  • All 25 records with the column isCurrent=true
  • All 25 records with the column end_date=Null
  • All 25 records with the column delete_flag=false

After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:

  1. Change the isContractor flag to false (change it to true if your dataset already shows false) for emp_id=12.
  2. Delete the entire row where emp_id=8 (make sure to save the record in a text editor, because we use this record in another use case).
  3. Copy the row for emp_id=25 and insert a new row. Change the emp_id to be 26, and make sure to change the values for other columns as well.

After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes Plain\nHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}
  1. Now, upload the changed fake_emp_data.json file to the same source prefix.

  1. After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
  2. When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=12:

  • One emp_id=12 record with the following values (for the record that was ingested as part of the initial load):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • A second emp_id=12 record with the following values (for the record that was ingested as part of the change to the source):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (or empty string)

The record for emp_id=8 that was deleted in the source as part of this run will still exist but with the following changes to the values:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

The new employee record will be inserted with the following values:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (or empty string)
  • delete_flag=false

Note that the emp_key values in your actual table may be different than what is provided here as an example.

  1. For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
  2. If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
  3. We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

See the following code:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true"

base_tbl.alias("employee")\
        .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond)\
        .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false",
                                                        "end_date": current_date(),
                                                        "delete_flag": "true"}).execute()
  1. For both the updates and the inserts, we check for the condition if the base table employee.emp_id is equal to the new changes.emp_id and the employee.emp_key is equal to new changes.emp_key, while only retrieving the current records.
  2. If this condition evaluates to true, we then get the current record (isCurrent=true and delete_flag=false).
  3. We merge the changes by updating the following:
    1. If the second condition evaluates to true:
      1. isCurrent=false
      2. end_date=current_date
    2. Or we insert the entire row as follows if the second condition evaluates to false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (or empty string)
      10. isCurrent=true
      11. delete_flag=false

See the following code:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false"

base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond)\
    .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false",
                                                            "end_date": current_date()
                                                            }) \
    .whenNotMatchedInsert(
    values={
        "isCurrent": "true",
        "emp_id": "employeeUpdates.emp_id",
        "first_name": "employeeUpdates.first_name",
        "last_name": "employeeUpdates.last_name",
        "Address": "employeeUpdates.Address",
        "phone_number": "employeeUpdates.phone_number",
        "isContractor": "employeeUpdates.isContractor",
        "emp_key": "employeeUpdates.emp_key",
        "start_date": current_date(),
        "delete_flag":  "employeeUpdates.delete_flag",
        "end_date": "null"
    })\
    .execute()

As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee table in the data lake and observe how the complete history is maintained.

Let’s modify our changed dataset from the previous step and make the following changes.

  1. Add the deleted emp_id=8 back to the dataset.

After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Upload the changed employee dataset file to the same source prefix.
  2. After you upload the changed fake_emp_data.json dataset to Amazon S3, navigate to the AWS Glue console and run the job again.
  3. When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run the following query and confirm there are 5 records:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=8:

  • One emp_id=8 record with the following values (the old record that was deleted):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Another emp_id=8 record with the following values (the new record that was inserted in the last run):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (or empty string)

The emp_key values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key.

End-user sample queries

The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:

  • Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return two employee records who left the organization.

  • Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return 23 active employee records who joined the organization.

  • Query 3 – Find the history of any given employee in the organization (in this case employee 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Note: Update the correct database name from the CloudFormation output before running the above query.

In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.

Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.

Clean up

When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.

Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.


About the authors

Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.

Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

AWS Glue crawlers support cross-account crawling to support data mesh architecture

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/aws-glue-crawlers-support-cross-account-crawling-to-support-data-mesh-architecture/

Data lakes have come a long way, and there’s been tremendous innovation in this space. Today’s modern data lakes are cloud native, work with multiple data types, and make this data easily available to diverse stakeholders across the business. As time has gone by, data lakes have grown significantly and have evolved to data meshes as a way to scale. Thoughtworks defines a data mesh as “a shift in a modern distributed architecture that applies platform thinking to create self-serve data infrastructure, treating data as the product.”

Data mesh advocates for decentralized ownership and delivery of enterprise data management systems that benefit several personas. Data producers can use the data mesh platform to create datasets and share them across business teams to ensure data availability, reliability, and interoperability across functions and data subject areas. Data consumers now have better data sharing with data mesh and federation across business units without compromising data security. The data governance team can support distributed data, where all data is accessible to those with the proper authority to access it. With data mesh, data doesn’t have to be consolidated into a single data lake or account and can remain within different databases and data lakes. An essential capability needed in such a data lake architecture is the ability to continuously understand changes in the data lakes in various other domains and make those available to data consumers. Without such a capability, manual work is needed to understand producers’ updates and make them available to consumers and governance.

AWS customers use a modern data architecture to facilitate governance and data sharing across logical or physical governance boundaries to create data domains aligned to lines of business. Each line of business creates and manages their dataset on Amazon Simple Storage Service (Amazon S3) and uses AWS Glue crawlers to discover new datasets and register them to the AWS Glue Data Catalog, add new tables and partitions, and detect schema changes. These datasets are shared with data consumers that access the data using services like Amazon Athena, Amazon Redshift, Amazon EMR, and more.

In the post Introducing AWS Glue crawlers using AWS Lake Formation permission management, we introduced a new set of capabilities in AWS Glue crawlers and AWS Lake Formation that simplifies crawler setup and supports centralized permissions for in-account and cross-account crawling of S3 data lakes. In this post, we demonstrate the same capability for a data mesh architecture in which we establish a central governance layer to catalog the data owned by the data producer and share it with the data consumer for ease of discovery. The AWS Glue crawler cross-account capability allows you to crawl data sources in different producer accounts while still having those changes cataloged in a centralized governance account. Customers prefer the central governance experience over writing bucket policies separately in each bucket owning the account of a data mesh producer. To build a data mesh architecture, now you can author permissions in a single Lake Formation governance to manage access to data locations and crawlers spanning multiple accounts in the data mesh.

According to the Allstate Corporation:

“By leveraging the power of AWS Lake Formation in our modern data architecture, we will be able to further unlock the potential of our data and empower our analytics community to drive innovation and build data-driven applications. The granular data access and collaboration provided by this architecture will enable us to build a truly unified data and analytics experience, bringing us one step closer to realizing our vision of becoming a fully data-driven enterprise.”

– Prashant Mehrotra, Director – Machine Learning and R&D, Allstate

In this post, we walk through the creation of a simplified data mesh architecture that shows how to use an AWS Glue crawler with Lake Formation to automate bringing changes from data producer domains to data consumers while maintaining centralized governance.

Solution overview

In a data mesh architecture, you have several producer accounts that own S3 buckets, several consumer accounts who wants to access shared datasets, and a central governance account to manage data shares between producers and consumers. This central governance account doesn’t own any S3 bucket or actual tables.

The following figure shows a simplified data mesh architecture with a single producer account, a centralized governance account, and a single consumer account. The data mesh producer account hosts the encrypted S3 bucket, which is shared with the central governance account. The central governance account registers the S3 bucket with Lake Formation using an AWS Identity and Access Management (IAM) role, which has permissions to the S3 bucket and AWS Key Management Service (AWS KMS). The central account creates the database for storing the dataset schema and shares it with the producer account. The producer account, as the S3 bucket owner, runs a crawler to crawl the buckets registered with the central account using Lake Formation permissions and populates the database. Now the shared database with new datasets are available to share with consumers in the data mesh. The central governance account can now share the database with a consumer admin, who can delegate access to other personas (such as data analysts) in the consumer account for data access.

shows a simplified data mesh architecture with a single producer account, a centralized governance account, and a single consumer account

In the following sections, we provide AWS CloudFormation templates to set up the resources in each account. Then we provide the steps to configure the crawler, manage permissions and sharing, and validate the solution by running queries with Athena.

Prerequisites

Complete the following steps in each account (producer, central governance, and consumer) to update the Data Catalog settings to use Lake Formation permissions to control catalog resources instead of IAM-based access control:

  1. Sign in to the Lake Formation console as admin.
  2. If this is the first time accessing the Lake Formation console, add yourself as the data lake administrator.
    add yourself as the data lake administrator.
  3. In the navigation pane, under Data catalog, choose Settings.
  4. Uncheck Use only IAM access control for new databases.
  5. Uncheck Use only IAM access control for new tables in new databases.
  6. Keep Version 3 as the current cross-account version.
  7. Choose Save.

Set up resources in the central governance account

The CloudFormation template for the central account creates a CentralDataMeshOwner user assigned as Lake Formation admin. The CentralDataMeshOwner user in the central governance account performs the necessary steps to share the central catalogs with the producer and consumer accounts. The CentralDataMeshOwner user also sets up a custom Lake Formation service role to register the S3 data lake location. Complete the following steps:

  1. Log in to the central governance account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For DataMeshOwnerUserName, keep the default (CentralDataMeshOwner).
  4. For ProducerAWSAccount, enter the producer account ID.
  5. Create the stack.
  6. After the stack launches, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  7. Note down the value of RegisterLocationServiceRole.
  8. Choose the LFUsersPassword value to navigate to the AWS Secrets Manager console.
  9. In the Secret value section, choose Retrieve secret value.
  10. Note down the secret value for the password for IAM user CentralDataMeshOwner.

Set up resources in the producer account

The CloudFormation template for the producer account creates the following resources:

  • IAM user LOBProducerSteward
  • S3 bucket retail-datalake-<producer account id >-<producer region>
  • KMS key used for bucket encryption
  • Required S3 bucket policies to provide access to the central governance account
  • AWS Glue crawler and crawler IAM role with necessary permissions

Complete the following steps:

  1. Log in to the producer account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For CentralAccountID, enter the central account ID.
  4. For CentralAccountLFServiceRole, enter the value of RegisterLocationServiceRole from CloudFormation noted earlier.
  5. Create the stack.
  6. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  7. Note down the AWSGlueServiceRole value.
  8. Choose the ProducerStewardUserCredentials value to navigate to the Secrets Manager console.
  9. In the Secret value section, choose Retrieve secret value.
  10. Note down the secret value for the password for IAM user LOBProducerSteward.
  11. On the Amazon S3 console, check the bucket policies for retail-datalake-<producer account id >-<producer region> and make sure it is shared with the central governance account IAM role.

This is required for registering the bucket with Lake Formation in the central account so that the account can manage the data sharing.

  1. On the AWS KMS console, check that the bucket is encrypted with the customer managed key and the key is shared with the central governance account.

Set up resources in the consumer account

The CloudFormation template for the consumer account creates the following resources:

  • IAM user ConsumerAdminUser assigned to the data lake admin
  • IAM user LFBusinessAnalyst1
  • S3 bucket for Athena output
  • Athena workgroup

Complete the following steps:

  1. Log in to the consumer account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. Create the stack.
  4. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  5. Choose the AllConsumerUsersCredentials value to navigate to the Secrets Manager console.
  6. In the Secret value section, choose Retrieve secret value.
  7. Note down the secret value for the password for the IAM user ConsumerAdminUser.

Now that all the accounts have been set up, we set up cross-account sharing on AWS with a central governance account to manage sharing of permissions across producers and consumers.

Configure the central governance account to manage sharing with the producer account

Sign in to the central governance account as CentralDataMeshOwner using the password noted earlier through the central governance account CloudFormation stack. Then complete the following steps:

  1. On Lake Formation console, choose Data lake locations under Register and ingest in the navigation pane.
  2. For Amazon S3 path, provide the path retail-datalake-<producer account id >-<region>.
  3. For IAM role, choose the IAM role created using the CloudFormation stack.

This role has permissions for the accessing the encrypted S3 bucket and its key. Do not choose the role AWSServiceRoleForLakeFormationDataAccess.

  1. Choose Register location.
  2. In the navigation pane, choose Databases.
  3. Choose Create database.
  4. For Database name¸ enter datameshtestdatabase.
  5. Choose Create database.
  6. In the navigation pane, choose Data locations and choose Grant.
  7. Select External account and provide the producer account for AWS account ID, AWS organization ID, or IAM principal ARN.
  8. For Storage location, provide the data lake bucket path.
  9. Select Grantable, then choose Grant.
  10. Choose Data lake permissions, then choose Grant.
  11. Select External accounts and provide the producer account number.
  12. For Databases, choose datameshtestdatabase.
  13. For Database permissions and Grantable permissions, select Create table, Alter, and Describe.
  14. Choose Grant.

Configure the crawler in the producer account to populate the schema

Sign in to producer account as LOBProducerSteward with the password noted earlier through the producer account CloudFormation stack, then complete the following steps:

  1. On the AWS RAM console, accept the pending resource share from the central account.
  2. On the Lake Formation console, choose Databases under Data catalog in the navigation pane.
  3. Choose datameshtestdatabase, and on the Action menu, choose Create resource link.
  4. For Resource link name, enter datameshtestdatabaselink.
  5. Choose Create.
  6. On the AWS Glue console, choose Crawlers in the navigation pane.
  7. Choose the crawler CrossAccountCrawler-<accountid>.
  8. Choose Edit, then choose Configure security settings.
  9. Select Use Lake Formation credentials for crawling S3 data source.
  10. Select In a different account and provide the account ID of the central governance account.
  11. Choose Next.
  12. Choose datameshtestdatabaselink as the database and choose Update.
  13. In the navigation pane, choose Data locations and choose Grant.
  14. Select My account, and choose the crawler IAM role for IAM users and roles.
  15. For Storage locations, choose the bucket retail-datalake-<accountid>-<region>.
  16. For Registered account location, enter the central account ID.
  17. Choose Grant.
    Alternatively, you can also use the AWS CLI to grant data location permission on bucket registered in central account to the crawler role using below command:

    aws lakeformation grant-permissions 
    --principal DataLakePrincipalIdentifier="<Crawler Role ARN>" 
    --permissions "DATA_LOCATION_ACCESS” 
    --resource ‘{ "DataLocation": {"ResourceArn":"<S3 bucket arn>", "CatalogId": "<Central Account id>"}}'

    For using CLI, refer to Installing or updating the latest version of the AWS CLI.

  18. In the navigation pane, choose Data lake permissions.
  19. Choose the crawler IAM role for the principal account.
  20. Choose datameshtestdatabase for the database.
  21. For Database permissions, select Create, Describe, and Alter.
  22. Choose Grant.
  23. Choose the crawler IAM role for the principal account.
  24. Choose datameshtestdatabaselink for the database.
  25. For Resource link permissions, select Describe.
  26. Choose Grant.
  27. Run the crawler.

The following screenshot shows the details after a successful run.

When the crawler is complete, you can validate the table created under the database datameshtestdatabaselink.

This table is owned by the producer account and available in the central governance account under the shared database datameshtestdatabase. Now the data lake admin in the central governance account can share the database and populated table with the consumer account.

Configure the central governance account to manage sharing of read-only access with the consumer account

Sign in to the central governance account as CentralDataMeshOwner with the password noted earlier through the central governance account CloudFormation stack, then complete the following steps:

  1. Grant database permissions to the consumer account.
  2. For Principals, choose external account and provide <consumer accountID>
  3. For Databases, select datameshtestdatabase.
  4. For Database permissions, select Describe.
  5. For Grantable permissions¸ select Describe.
  6. Choose Grant.

  7. Grant table permissions to the consumer account.
  8. For Principals, choose external account and provide <consumer accountID>.
  9. For Databases, select datameshtestdatabase.
  10. For Tables, select retail_datalake_<accountID>_<region>.
  11. For Table permissions, select Select and Describe.
  12. For Grantable permissions¸ select Select and Describe.
  13. Choose Grant.

Configure the consumer account as the consumer account data lake admin

Sign to the consumer account as ConsumerAdminUser with the password noted earlier through the consumer account CloudFormation stack. (Note that in the consumer account Lake Formation configuration, both ConsumerAdminUser and LFBusinessAnalyst1 have the same password.)

  1. On the AWS RAM console, accept the resource share from the central account.
  2. On the Lake Formation console, validate that the shared database datameshtestdatabase is available and create the resource link datameshtestdatabaselink using the shared database.

The following screenshot shows the details after the resource link is created.

  1. On the Lake Formation console, choose Grant.
  2. Choose LFBusinessAnalyst1 for IAM users and roles.
  3. Choose datameshtestdatabase for the database under Named data catalog resources.
  4. Select Describe for Database permissions.
  5. On the Lake Formation console, choose Grant.
  6. Choose LFBusinessAnalyst1 for IAM users and roles.
  7. Choose datameshtestdatabaselink for the database under Named data catalog resources.
  8. Select Describe for Resource link permissions.
  9. On the Lake Formation console, choose Grant.
  10. Choose LFBusinessAnalyst1 for IAM users and roles.
  11. Choose retail_datalake_<accountid>_<region> for the table under Named data catalog resources.
  12. Select Select and Describe for Table permissions.

Run queries in the consumer account

Sign to the consumer account console as LFBusinessAnalyst1 with the password noted earlier through the consumer account CloudFormation stack, then complete the following steps:

  1. On the Athena console, and choose lfconsumer-workgroup as the Athena workgroup.
  2. Run the following query to validate access:
select * from datameshtestdatabaselink.retail_datalake_<accountid>_<region>

We have successfully registered the dataset and created a Data Catalog in the central governance account. We crawled the data lake that was registered with the central governance account using Lake Formation permissions from the producer account and populated the schema. We granted Lake Formation permission on the database and table from the central account to the consumer user and validated consumer user access to the data using Athena.

Clean up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign in to the CloudFormation console as the IAM admin used for creating the CloudFormation stack in all three accounts.
  2. Delete the stacks you created.

Conclusion

In this post, we showed how to set up cross-account crawling using a central governance account with the new AWS Glue crawler capability of Lake Formation integration. This capability allows data producers to set up crawling capabilities in their own domain so that changes are seamlessly available to data governance and data consumers. Implementing a data mesh with AWS Glue crawlers, Lake Formation, Athena, and other analytical services provide a well-understood, performant, scalable, and cost-effective solution to integrate, prepare, and serve data.

If you have questions or suggestions, submit them in the comments section.

For more resources, refer to the following:


About the authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Piyali Kamra is a seasoned enterprise architect and a hands-on technologist who believes that building large scale enterprise systems is not an exact science but more like an art, in which tools and technologies must be carefully selected based on the team’s culture , strengths , weaknesses and risks , in tandem with having a futuristic vision as to how you want to shape your product a few years down the road.

Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 2: AWS Glue Studio Visual Editor

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-glue-studio-visual-editor-introducing-native-support-for-apache-hudi-delta-lake-and-apache-iceberg-on-aws-glue-for-apache-spark/

In the first post of this series, we described how AWS Glue for Apache Spark works with Apache Hudi, Linux Foundation Delta Lake, and Apache Iceberg datasets tables using the native support of those data lake formats. This native support simplifies reading and writing your data for these data lake frameworks so you can more easily build and maintain your data lakes in a transactionally consistent manner. This feature removes the need to install a separate connector and reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark jobs.

These data lake frameworks help you store data more efficiently and enable applications to access your data faster. Unlike simpler data file formats such as Apache Parquet, CSV, and JSON, which can store big data, data lake frameworks organize distributed big data files into tabular structures that enable basic constructs of databases on data lakes.

Expanding on the functionality we announced at AWS re:Invent 2022, AWS Glue now natively supports Hudi, Delta Lake and Iceberg through the AWS Glue Studio visual editor. If you prefer authoring AWS Glue for Apache Spark jobs using a visual tool, you can now choose any of these three data lake frameworks as a source or target through a graphical user interface (GUI) without any custom code.

Even without prior experience using Hudi, Delta Lake or Iceberg, you can easily achieve typical use cases. In this post, we demonstrate how to ingest data stored in Hudi using the AWS Glue Studio visual editor.

Example scenario

To demonstrate the visual editor experience, this post introduces the Global Historical Climatology Network Daily (GHCN-D) dataset. The data is publicly accessible through an Amazon Simple Storage Service (Amazon S3) bucket. For more information, see the Registry of Open Data on AWS. You can also learn more in Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight.

The Amazon S3 location s3://noaa-ghcn-pds/csv/by_year/ has all the observations from 1763 to the present organized in CSV files, one file for each year. The following block shows an example of what the records look like:

ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
AE000041196,20220101,TAVG,204,H,,S,
AEM00041194,20220101,TAVG,211,H,,S,
AEM00041217,20220101,TAVG,209,H,,S,
AEM00041218,20220101,TAVG,207,H,,S,
AE000041196,20220102,TAVG,226,H,,S,
...
AE000041196,20221231,TMAX,243,,,S,
AE000041196,20221231,PRCP,0,D,,S,
AE000041196,20221231,TAVG,202,H,,S,

The records have fields including ID, DATE, ELEMENT, and more. Each combination of ID, DATE, and ELEMENT represents a unique record in this dataset. For example, the record with ID as AE000041196, ELEMENT as TAVG, and DATE as 20220101 is unique.

In this tutorial, we assume that the files are updated with new records every day, and want to store only the latest record per the primary key (ID and ELEMENT) to make the latest snapshot data queryable. One typical approach is to do an INSERT for all the historical data, and calculate the latest records in queries; however, this can introduce additional overhead in all the queries. When you want to analyze only the latest records, it’s better to do an UPSERT (update and insert) based on the primary key and DATE field rather than just an INSERT in order to avoid duplicates and maintain a single updated row of data.

Prerequisites

To continue this tutorial, you need to create the following AWS resources in advance:

Process a Hudi dataset on the AWS Glue Studio visual editor

Let’s author an AWS Glue job to read daily records in 2022, and write the latest snapshot into the Hudi table on your S3 bucket using UPSERT. Complete following steps:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset:

  1. Under Visual, choose Data source – S3 bucket.
  2. Under Node properties, for S3 source type, select S3 location.
  3. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured.

data-source

The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket:

  1. Choose Data target – S3 bucket.
  2. Under Data target properties- S3, for Format, choose Apache Hudi.
  3. For Hudi Table Name, enter ghcn.
  4. For Hudi Storage Type, choose Copy on write.
  5. For Hudi Write Operation, choose Upsert.
  6. For Hudi Record Key Fields, choose ID.
  7. For Hudi Precombine Key Field, choose DATE.
  8. For Compression Type, choose GZIP.
  9. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_native/ghcn/. (Provide your S3 bucket name and prefix.)

To make it easy to discover the sample data, and also make it queryable from Athena, configure the job to create a table definition on the AWS Glue Data Catalog:

  1. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  2. For Database, choose hudi_native.
  3. For Table name, enter ghcn.
  4. For Partition keys – optional, choose ELEMENT.

Now your data integration job is authored in the visual editor completely. Let’s add one remaining setting about the IAM role, then run the job:

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Choose Save, then choose Run.

data-target

  1. Navigate to the Runs tab to track the job progress and wait for it to complete.

job-run

Query the table with Athena

Now that the job has successfully created the Hudi table, you can query the table through different engines, including Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum, in addition to AWS Glue for Apache Spark.

To query through Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. In the query editor, enter the following SQL and choose Run:
SELECT * FROM "hudi_native"."ghcn" limit 10;

The following screenshot shows the query result.
athena-query1

Let’s dive deep into the table to understand how the data is ingested and focus on the records with ID=’AE000041196′.

  1. Run the following query to focus on the very specific example records with ID='AE000041196':
SELECT * FROM "hudi_native"."ghcn" WHERE ID='AE000041196';

The following screenshot shows the query result.
athena-query2

The original source file 2022.csv has historical records for record ID='USW00012894' from 20220101 to 20221231, however the query result shows only four records, one record per ELEMENT at the latest snapshot of the day 20221230 or 20221231. Because we used the UPSERT write option when writing data, we configured the ID field as a Hudi record key field, the DATE field as a Hudi precombine field, and the ELEMENT field as partition key field. When two records have the same key value, Hudi picks the one with the largest value for the precombine field. When the job ingested data, it compared all the values in the DATE field for each pair of ID and ELEMENT, and then picked the record with the largest value in the DATE field.

According to the preceding result, we were able to ingest the latest snapshot from all the 2022 data. Now let’s do an UPSERT of the new 2023 data to overwrite the records on the target Hudi table.

  1. Go back to AWS Glue Studio console, modify the source S3 location to s3://noaa-ghcn-pds/csv/by_year/2023.csv, then save and run the job.

upsert-data-source

  1. Run the same Athena query from the Athena console.

athena-query3
Now you see that the four records have been updated with the new records in 2023.

If you have further future records, this approach works well to upsert new records based on the Hudi record key and Hudi precombine key.

Clean up

Now to the final step, cleaning up the resources:

  1. Delete the AWS Glue database hudi_native.
  2. Delete the AWS Glue table ghcn.
  3. Delete the S3 objects under s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_native/ghcn2022/.

Conclusion

This post demonstrated how to process Hudi datasets using the AWS Glue Studio visual editor. The AWS Glue Studio visual editor enables you to author jobs while taking advantage of data lake formats and without needing expertise in them. If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Scott Long is a Front End Engineer on the AWS Glue team. He is responsible for implementing new features in AWS Glue Studio. In his spare time, he enjoys socializing with friends and participating in various outdoor activities.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

How Infomedia built a serverless data pipeline with change data capture using AWS Glue and Apache Hudi

Post Syndicated from Gowtham Dandu original https://aws.amazon.com/blogs/big-data/how-infomedia-built-a-serverless-data-pipeline-with-change-data-capture-using-aws-glue-and-apache-hudi/

This is a guest post co-written with Gowtham Dandu from Infomedia.

Infomedia Ltd (ASX:IFM) is a leading global provider of DaaS and SaaS solutions that empowers the data-driven automotive ecosystem. Infomedia’s solutions help OEMs, NSCs, dealerships and 3rd party partners manage the vehicle and customer lifecycle. They are used by over 250,000 industry professionals, across 50 OEM brands and in 186 countries to create a convenient customer journey, drive dealer efficiencies and grow sales.

In this post, we share how Infomedia built a serverless data pipeline with change data capture (CDC) using AWS Glue and Apache Hudi.

Infomedia was looking to build a cloud-based data platform to take advantage of highly scalable data storage with flexible and cloud-native processing tools to ingest, transform, and deliver datasets to their SaaS applications. The team wanted to set up a serverless architecture with scale-out capabilities that would allow them to optimize time, cost, and performance of the data pipelines and eliminate most of the infrastructure management.

To serve data to their end-users, the team wanted to develop an API interface to retrieve various product attributes on demand. Performance and scalability of both the data pipeline and API endpoint were key success criteria. The data pipeline needed to have sufficient performance to allow for fast turnaround in the event that data issues needed to be corrected. Finally, the API endpoint performance was important for end-user experience and customer satisfaction. When designing the data processing pipeline for the attribute API, the Infomedia team wanted to use a flexible and open-source solution for processing data workloads with minimal operational overhead.

They saw an opportunity to use AWS Glue, which offers a popular open-source big data processing framework, and Apache Spark, in a serverless environment for end-to-end pipeline development and deployment.

Solution overview

The solution involved ingesting data from various third-party sources in different formats, processing to create a semantic layer, and then exposing the processed dataset as a REST API to end-users. The API retrieves data at runtime from an Amazon Aurora PostgreSQL-Compatible Edition database for end-user consumption. To populate the database, the Infomedia team developed a data pipeline using Amazon Simple Storage Service (Amazon S3) for data storage, AWS Glue for data transformations, and Apache Hudi for CDC and record-level updates. They wanted to develop a simple incremental data processing pipeline without having to update the entire database each time the pipeline ran. The Apache Hudi framework allowed the Infomedia team to maintain a golden reference dataset and capture changes so that the downstream database could be incrementally updated in a short timeframe.

To implement this modern data processing solution, Infomedia’s team chose a layered architecture with the following steps:

  1. The raw data originates from various third-party sources and is a collection of flat files with a fixed width column structure. The raw input data is stored in Amazon S3 in JSON format (called the bronze dataset layer).
  2. The raw data is converted to an optimized Parquet format using AWS Glue. The Parquet data is stored in a separate Amazon S3 location and serves as the staging area during the CDC process (called the silver dataset layer). The Parquet format results in improved query performance and cost savings for downstream processing.
  3. AWS Glue reads the Parquet file from the staging area and updates Apache Hudi tables stored in Amazon S3 (the golden dataset layer) as part of incremental data processing. This process helps create mutable datasets on Amazon S3 to store the versioned and latest set of records.
  4. Finally, AWS Glue is used to populate Amazon Aurora PostgreSQL-Compatible Edition with the latest version of the records. This dataset is used to serve the API endpoint. The API itself is a Spring Java application deployed as a Docker container in an Amazon Elastic Container Service (Amazon ECS) AWS Fargate environment.

The following diagram illustrates this architecture.

arch diag

AWS Glue and Apache Hudi overview

AWS Glue is a serverless data integration service that makes it easy to prepare and process data at scale from a wide variety of data sources. With AWS Glue, you can ingest data from multiple data sources, extract and infer schema, populate metadata in a centralized data catalog, and prepare and transform data for analytics and machine learning. AWS Glue has a pay-as-you-go model with no upfront costs, and you only pay for resources that you consume.

Apache Hudi is an open-source data management framework used to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert, and delete capabilities. It allows you to comply with data privacy laws, manage CDC operations, reinstate late-arriving data, and roll back to a particular point in time. You can use AWS Glue to build a serverless Apache Spark-based data pipeline and take advantage of the AWS Glue native connector for Apache Hudi at no cost to manage CDC operations with record-level insert, updates, and deletes.

Solution benefits

Since the start of Infomedia’s journey with AWS Glue, the Infomedia team has experienced several benefits over the self-managed extract, transform, and load (ETL) tooling. With the horizontal scaling of AWS Glue, they were able to seamlessly scale the compute capacity of their data pipeline workloads by a factor of 5. This allowed them to increase both the volume of records and the number of datasets they could process for downstream consumption. They were also able to take advantage of AWS Glue built-in optimizations, such as pre-filtering using pushdown predicates, which allowed the team to save valuable engineering time tuning the performance of data processing jobs.

In addition, Apache Spark-based AWS Glue enabled developers to author jobs using concise Spark SQL and dataset APIs. This allowed for rapid upskilling of developers who are already familiar with database programming. Because developers are working with higher-level constructs across entire datasets, they spend less time solving for low-level technical implementation details.

Also, the AWS Glue platform has been cost-effective when compared against running self-managed Apache Spark infrastructure. The team did an initial analysis that showed an estimated savings of 70% over running a dedicated Spark EC2 infrastructure for their workload. Furthermore, the AWS Glue Studio job monitoring dashboard provides the Infomedia team with detailed job-level visibility that makes it easy to get a summary of the job runs and understand data processing costs.

Conclusion and next steps

Infomedia will continue to modernize their complex data pipelines using the AWS Glue platform and other AWS Analytics services. Through integration with services such as AWS Lake Formation and the AWS Glue Data Catalog, the Infomedia team plans to maintain reference primary datasets and democratize access to high-value datasets, allowing for further innovation.

If you would like to learn more, please visit AWS Glue and AWS Lake Formation to get started on your data integration journey.


About the Authors

Gowtham Dandu is an Engineering Lead at Infomedia Ltd with a passion for building efficient and effective solutions on the cloud, especially involving data, APIs, and modern SaaS applications. He specializes in building microservices and data platforms that are cost-effective and highly scalable.

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

Simplify data loading into Type 2 slowly changing dimensions in Amazon Redshift

Post Syndicated from Vaidy Kalpathy original https://aws.amazon.com/blogs/big-data/simplify-data-loading-into-type-2-slowly-changing-dimensions-in-amazon-redshift/

Thousands of customers rely on Amazon Redshift to build data warehouses to accelerate time to insights with fast, simple, and secure analytics at scale and analyze data from terabytes to petabytes by running complex analytical queries. Organizations create data marts, which are subsets of the data warehouse and usually oriented for gaining analytical insights specific to a business unit or team. The star schema is a popular data model for building data marts.

In this post, we show how to simplify data loading into a Type 2 slowly changing dimension in Amazon Redshift.

Star schema and slowly changing dimension overview

A star schema is the simplest type of dimensional model, in which the center of the star can have one fact table and a number of associated dimension tables. A dimension is a structure that captures reference data along with associated hierarchies, while a fact table captures different values and metrics that can be aggregated by dimensions. Dimensions provide answers to exploratory business questions by allowing end-users to slice and dice data in a variety of ways using familiar SQL commands.

Whereas operational source systems contain only the latest version of master data, the star schema enables time travel queries to reproduce dimension attribute values on past dates when the fact transaction or event actually happened. The star schema data model allows analytical users to query historical data tying metrics to corresponding dimensional attribute values over time. Time travel is possible because dimension tables contain the exact version of the associated attributes at different time ranges. Relative to the metrics data that keeps changing on a daily or even hourly basis, the dimension attributes change less frequently. Therefore, dimensions in a star schema that keeps track of changes over time are referred to as slowly changing dimensions (SCDs).

Data loading is one of the key aspects of maintaining a data warehouse. In a star schema data model, the central fact table is dependent on the surrounding dimension tables. This is captured in the form of primary key-foreign key relationships, where the dimension table primary keys are referred by foreign keys in the fact table. In the case of Amazon Redshift, uniqueness, primary key, and foreign key constraints are not enforced. However, declaring them will help the optimizer arrive at optimal query plans, provided that the data loading processes enforce their integrity. As part of data loading, the dimension tables, including SCD tables, get loaded first, followed by the fact tables.

SCD population challenge

Populating an SCD dimension table involves merging data from multiple source tables, which are usually normalized. SCD tables contain a pair of date columns (effective and expiry dates) that represent the record’s validity date range. Changes are inserted as new active records effective from the date of data loading, while simultaneously expiring the current active record on a previous day. During each data load, incoming change records are matched against existing active records, comparing each attribute value to determine whether existing records have changed or were deleted or are new records coming in.

In this post, we demonstrate how to simplify data loading into a dimension table with the following methods:

  • Using Amazon Simple Storage Service (Amazon S3) to host the initial and incremental data files from source system tables
  • Accessing S3 objects using Amazon Redshift Spectrum to carry out data processing to load native tables within Amazon Redshift
  • Creating views with window functions to replicate the source system version of each table within Amazon Redshift
  • Joining source table views to project attributes matching with dimension table schema
  • Applying incremental data to the dimension table, bringing it up to date with source-side changes

Solution overview

In a real-world scenario, records from source system tables are ingested on a periodic basis to an Amazon S3 location before being loaded into star schema tables in Amazon Redshift.

For this demonstration, data from two source tables, customer_master and customer_address, are combined to populate the target dimension table dim_customer, which is the customer dimension table.

The source tables customer_master and customer_address share the same primary key, customer_id, and will be joined on the same to fetch one record per customer_id along with attributes from both tables. row_audit_ts contains the latest timestamp at which the particular source record was inserted or last updated. This column helps identify the change records since the last data extraction.

rec_source_status is an optional column that indicates if the corresponding source record was inserted, updated, or deleted. This is applicable in cases where the source system itself provides the changes and populates rec_source_status appropriately.

The following figure provides the schema of the source and target tables.

Let’s look closer at the schema of the target table, dim_customer. It contains different categories of columns:

  • Keys – It contains two types of keys:
    • customer_sk is the primary key of this table. It is also called the surrogate key and has a unique value that is monotonically increasing.
    • customer_id is the source primary key and provides a reference back to the source system record.
  • SCD2 metadatarec_eff_dt and rec_exp_dt indicate the state of the record. These two columns together define the validity of the record. The value in rec_exp_dt will be set as ‘9999-12-31’ for presently active records.
  • Attributes – Includes first_name, last_name, employer_name, email_id, city, and country.

Data loading into a SCD table involves a first-time bulk data loading, referred to as the initial data load. This is followed by continuous or regular data loading, referred to as an incremental data load, to keep the records up to date with changes in the source tables.

To demonstrate the solution, we walk through the following steps for initial data load (1–7) and incremental data load (8–12):

  1. Land the source data files in an Amazon S3 location, using one subfolder per source table.
  2. Use an AWS Glue crawler to parse the data files and register tables in the AWS Glue Data Catalog.
  3. Create an external schema in Amazon Redshift to point to the AWS Glue database containing these tables.
  4. In Amazon Redshift, create one view per source table to fetch the latest version of the record for each primary key (customer_id) value.
  5. Create the dim_customer table in Amazon Redshift, which contains attributes from all relevant source tables.
  6. Create a view in Amazon Redshift joining the source table views from Step 4 to project the attributes modeled in the dimension table.
  7. Populate the initial data from the view created in Step 6 into the dim_customer table, generating customer_sk.
  8. Land the incremental data files for each source table in their respective Amazon S3 location.
  9. In Amazon Redshift, create a temporary table to accommodate the change-only records.
  10. Join the view from Step 6 and dim_customer and identify change records comparing the combined hash value of attributes. Populate the change records into the temporary table with an I, U, or D indicator.
  11. Update rec_exp_dt in dim_customer for all U and D records from the temporary table.
  12. Insert records into dim_customer, querying all I and U records from the temporary table.

Prerequisites

Before you get started, make sure you meet the following prerequisites:

Land data from source tables

Create separate subfolders for each source table in an S3 bucket and place the initial data files within the respective subfolder. In the following image, the initial data files for customer_master and customer_address are made available within two different subfolders. To try out the solution, you can use customer_master_with_ts.csv and customer_address_with_ts.csv as initial data files.

It’s important to include an audit timestamp (row_audit_ts) column that indicates when each record was inserted or last updated. As part of incremental data loading, rows with the same primary key value (customer_id) can arrive more than once. The row_audit_ts column helps identify the latest version of such records for a given customer_id to be used for further processing.

Register source tables in the AWS Glue Data Catalog

We use an AWS Glue crawler to infer metadata from delimited data files like the CSV files used in this post. For instructions on getting started with an AWS Glue crawler, refer to Tutorial: Adding an AWS Glue crawler.

Create an AWS Glue crawler and point it to the Amazon S3 location that contains the source table subfolders, within which the associated data files are placed. When you’re creating the AWS Glue crawler, create a new database named rs-dimension-blog. The following screenshots show the AWS Glue crawler configuration chosen for our data files.

Note that for the Set output and scheduling section, the advanced options are left unchanged.

Running this crawler should create the following tables within the rs-dimension-blog database:

  • customer_address
  • customer_master

Create schemas in Amazon Redshift

First, create an AWS Identity and Access Management (IAM) role named rs-dim-blog-spectrum-role. For instructions, refer to Create an IAM role for Amazon Redshift.

The IAM role has Amazon Redshift as the trusted entity, and the permissions policy includes AmazonS3ReadOnlyAccess and AWSGlueConsoleFullAccess, because we’re using the AWS Glue Data Catalog. Then associate the IAM role with the Amazon Redshift cluster or endpoint.

Instead, you can also set the IAM role as the default for your Amazon Redshift cluster or endpoint. If you do so, in the following create external schema command, pass the iam_role parameter as iam_role default.

Now, open Amazon Redshift Query Editor V2 and create an external schema passing the newly created IAM role and specifying the database as rs-dimension-blog. The database name rs-dimension-blog is the one created in the Data Catalog as part of configuring the crawler in the preceding section. See the following code:

create external schema spectrum_dim_blog 
from data catalog 
database 'rs-dimension-blog' 
iam_role 'arn:aws:iam::<accountid>:role/rs-dim-blog-spectrum-role';

Check if the tables registered in the Data Catalog in the preceding section are visible from within Amazon Redshift:

select * 
from spectrum_dim_blog.customer_master 
limit 10;

select * 
from spectrum_dim_blog.customer_address 
limit 10;

Each of these queries will return 10 rows from the respective Data Catalog tables.

Create another schema in Amazon Redshift to host the table, dim_customer:

create schema rs_dim_blog;

Create views to fetch the latest records from each source table

Create a view for the customer_master table, naming it vw_cust_mstr_latest:

create view rs_dim_blog.vw_cust_mstr_latest as with rows_numbered as (
  select 
    customer_id, 
    first_name, 
    last_name, 
    employer_name, 
    row_audit_ts, 
    row_number() over(
      partition by customer_id 
      order by 
        row_audit_ts desc
    ) as rnum 
  from 
    spectrum_dim_blog.customer_master
) 
select 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  row_audit_ts, 
  rnum 
from 
  rows_numbered 
where 
  rnum = 1 with no schema binding;

The preceding query uses row_number, which is a window function provided by Amazon Redshift. Using window functions enables you to create analytic business queries more efficiently. Window functions operate on a partition of a result set, and return a value for every row in that window. The row_number window function determines the ordinal number of the current row within a group of rows, counting from 1, based on the ORDER BY expression in the OVER clause. By including the PARTITION BY clause as customer_id, groups are created for each value of customer_id and ordinal numbers are reset for each group.

Create a view for the customer_address table, naming it vw_cust_addr_latest:

create view rs_dim_blog.vw_cust_addr_latest as with rows_numbered as (
  select 
    customer_id, 
    email_id, 
    city, 
    country, 
    row_audit_ts, 
    row_number() over(
      partition by customer_id 
      order by 
        row_audit_ts desc
    ) as rnum 
  from 
    spectrum_dim_blog.customer_address
) 
select 
  customer_id, 
  email_id, 
  city, 
  country, 
  row_audit_ts, 
  rnum 
from 
  rows_numbered 
where 
  rnum = 1 with no schema binding;

Both view definitions use the row_number window function of Amazon Redshift, ordering the records by descending order of the row_audit_ts column (the audit timestamp column). The condition rnum=1 fetches the latest record for each customer_id value.

Create the dim_customer table in Amazon Redshift

Create dim_customer as an internal table in Amazon Redshift within the rs_dim_blog schema. The dimension table includes the column customer_sk, that acts as the surrogate key column and enables us to capture a time-sensitive version of each customer record. The validity period for each record is defined by the columns rec_eff_dt and rec_exp_dt, representing record effective date and record expiry date, respectively. See the following code:

create table rs_dim_blog.dim_customer (
  customer_sk bigint, 
  customer_id bigint, 
  first_name varchar(100), 
  last_name varchar(100), 
  employer_name varchar(100), 
  email_id varchar(100), 
  city varchar(100), 
  country varchar(100), 
  rec_eff_dt date, 
  rec_exp_dt date
) diststyle auto;

Create a view to consolidate the latest version of source records

Create the view vw_dim_customer_src, which consolidates the latest records from both source tables using left outer join, keeping them ready to be populated into the Amazon Redshift dimension table. This view fetches data from the latest views defined in the section “Create views to fetch the latest records from each source table”:

create view rs_dim_blog.vw_dim_customer_src as 
select 
  m.customer_id, 
  m.first_name, 
  m.last_name, 
  m.employer_name, 
  a.email_id, 
  a.city, 
  a.country 
from 
  rs_dim_blog.vw_cust_mstr_latest as m 
  left join rs_dim_blog.vw_cust_addr_latest as a on m.customer_id = a.customer_id 
order by 
  m.customer_id with no schema binding;

At this point, this view fetches the initial data for loading into the dim_customer table that we are about to create. In your use-case, use a similar approach to create and join the required source table views to populate your target dimension table.

Populate initial data into dim_customer

Populate the initial data into the dim_customer table by querying the view vw_dim_customer_src. Because this is the initial data load, running row numbers generated by the row_number window function will suffice to populate a unique value in the customer_sk column starting from 1:

insert into rs_dim_blog.dim_customer 
select 
  row_number() over() as customer_sk, 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  cast('2022-07-01' as date) rec_eff_dt, 
  cast('9999-12-31' as date) rec_exp_dt 
from 
  rs_dim_blog.vw_dim_customer_src;

In this query, we have specified ’2022-07-01’ as the value in rec_eff_dt for all initial data records. For your use-case, you can modify this date value as appropriate to your situation.

The preceding steps complete the initial data loading into the dim_customer table. In the next steps, we proceed with populating incremental data.

Land ongoing change data files in Amazon S3

After the initial load, the source systems provide data files on an ongoing basis, either containing only new and change records or a full extract containing all records for a particular table.

You can use the sample files customer_master_with_ts_incr.csv and customer_address_with_ts_incr.csv, which contain changed as well as new records. These incremental files need to be placed in the same location in Amazon S3 where the initial data files were placed. Please see section “Land data from source tables”. This will result in the corresponding Redshift Spectrum tables automatically reading the additional rows.

If you used the sample file for customer_master, after adding the incremental files, the following query shows the initial as well as incremental records:

select 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  row_audit_ts 
from 
  spectrum_dim_blog.customer_master 
order by 
  customer_id;

In case of full extracts, we can identify deletes occurring in the source system tables by comparing the previous and current versions and looking for missing records. In case of change-only extracts where the rec_source_status column is present, its value will help us identify deleted records. In either case, land the ongoing change data files in the respective Amazon S3 locations.

For this example, we have uploaded the incremental data for the customer_master and customer_address source tables with a few customer_id records receiving updates and a few new records being added.

Create a temporary table to capture change records

Create the temporary table temp_dim_customer to store all changes that need to be applied to the target dim_customer table:

create temp table temp_dim_customer (
  customer_sk bigint, 
  customer_id bigint, 
  first_name varchar(100), 
  last_name varchar(100), 
  employer_name varchar(100), 
  email_id varchar(100), 
  city varchar(100), 
  country varchar(100), 
  rec_eff_dt date, 
  rec_exp_dt date, 
  iud_operation character(1)
);

Populate the temporary table with new and changed records

This is a multi-step process that can be combined into a single complex SQL. Complete the following steps:

  1. Fetch the latest version of all customer attributes by querying the view vw_dim_customer_src:
select 
  customer_id, 
  sha2(
    coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
  ) as hash_value, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  current_date rec_eff_dt, 
  cast('9999-12-31' as date) rec_exp_dt 
from 
  rs_dim_blog.vw_dim_customer_src;

Amazon Redshift offers hashing functions such as sha2, which converts a variable length string input into a fixed length character output. The output string is a text representation of the hexadecimal value of the checksum with the specified number of bits. In this case, we pass a concatenated set of customer attributes whose change we want to track, specifying the number of bits as 512. We’ll use the output of the hash function to determine if any of the attributes have undergone a change. This dataset will be called newver (new version).

Because we landed the ongoing change data in the same location as the initial data files, the records retrieved from the preceding query (in newver) include all records, even the unchanged ones. But because of the definition of the view vw_dim_customer_src, we get only one record per customerid, which is its latest version based on row_audit_ts.

  1. In a similar manner, retrieve the latest version of all customer records from dim_customer, which are identified by rec_exp_dt=‘9999-12-31’. While doing so, also retrieve the sha2 value of all customer attributes available in dim_customer:
select 
  customer_id, 
  sha2(
    coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
  ) as hash_value, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country 
from 
  rs_dim_blog.dim_customer 
where 
  rec_exp_dt = '9999-12-31';

This dataset will be called oldver (old or existing version).

  1. Identify the current maximum surrogate key value from the dim_customer table:
select 
  max(customer_sk) as maxval 
from 
  rs_dim_blog.dim_customer;

This value (maxval) will be added to the row_number before being used as the customer_sk value for the change records that need to be inserted.

  1. Perform a full outer join of the old version of records (oldver) and the new version (newver) of records on the customer_id column. Then compare the old and new hash values generated by the sha2 function to determine if the change record is an insert, update, or delete:
case when oldver.customer_id is null then 'I'
when newver.customer_id is null then 'D'
when oldver.hash_value != newver.hash_value then 'U'
else 'N' end as iud_op

We tag the records as follows:

  • If the customer_id is non-existent in the oldver dataset (oldver.customer_id is null), it’s tagged as an insert (‘I').
  • Otherwise, if the customer_id is non-existent in the newver dataset (newver.customer_id is null), it’s tagged as a delete (‘D').
  • Otherwise, if the old hash_value and new hash_value are different, these records represent an update (‘U').
  • Otherwise, it indicates that the record has not undergone any change and therefore can be ignored or marked as not-to-be-processed (‘N').

Make sure to modify the preceding logic if the source extract contains rec_source_status to identify deleted records.

Although sha2 output maps a possibly infinite set of input strings to a finite set of output strings, the chances of collision of hash values for the original row values and changed row values are very unlikely. Instead of individually comparing each column value before and after, we compare the hash values generated by sha2 to conclude if there has been a change in any of the attributes of the customer record. For your use-case, we recommend you choose a hash function that works for your data conditions after adequate testing. Instead, you can compare individual column values if none of the hash functions satisfactorily meet your expectations.

  1. Combining the outputs from the preceding steps, let’s create the INSERT statement that captures only change records to populate the temporary table:
insert into temp_dim_customer (
  customer_sk, customer_id, first_name, 
  last_name, employer_name, email_id, 
  city, country, rec_eff_dt, rec_exp_dt, 
  iud_operation
) with newver as (
  select 
    customer_id, 
    sha2(
      coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
    ) as hash_value, 
    first_name, 
    last_name, 
    employer_name, 
    email_id, 
    city, 
    country, 
    current_date rec_eff_dt, 
    cast('9999-12-31' as date) rec_exp_dt 
  from 
    rs_dim_blog.vw_dim_customer_src
), 
oldver as (
  select 
    customer_id, 
    sha2(
      coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
    ) as hash_value, 
    first_name, 
    last_name, 
    employer_name, 
    email_id, 
    city, 
    country 
  from 
    rs_dim_blog.dim_customer 
  where 
    rec_exp_dt = '9999-12-31'
), 
maxsk as (
  select 
    max(customer_sk) as maxval 
  from 
    rs_dim_blog.dim_customer
), 
allrecs as (
  select 
    coalesce(oldver.customer_id, newver.customer_id) as customer_id, 
    case when oldver.customer_id is null then 'I' when newver.customer_id is null then 'D' when oldver.hash_value != newver.hash_value then 'U' else 'N' end as iud_op, 
    newver.first_name, 
    newver.last_name, 
    newver.employer_name, 
    newver.email_id, 
    newver.city, 
    newver.country, 
    newver.rec_eff_dt, 
    newver.rec_exp_dt 
  from 
    oldver full 
    outer join newver on oldver.customer_id = newver.customer_id
) 
select 
  (maxval + (row_number() over())) as customer_sk, 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  rec_eff_dt, 
  rec_exp_dt, 
  iud_op 
from 
  allrecs, 
  maxsk 
where 
  iud_op != 'N';

Expire updated customer records

With the temp_dim_customer table now containing only the change records (either ‘I’, ‘U’, or ‘D’), the same can be applied on the target dim_customer table.

Let’s first fetch all records with values ‘U’ or ‘D’ in the iud_op column. These are records that have either been deleted or updated in the source system. Because dim_customer is a slowly changing dimension, it needs to reflect the validity period of each customer record. In this case, we expire the presently active recorts that have been updated or deleted. We expire these records as of yesterday (by setting rec_exp_dt=current_date-1) matching on the customer_id column:

update 
  rs_dim_blog.dim_customer 
set 
  rec_exp_dt = current_date - 1 
where 
  customer_id in (
    select 
      customer_id 
    from 
      temp_dim_customer as t 
    where 
      iud_operation in ('U', 'D')
  ) 
  and rec_exp_dt = '9999-12-31';

Insert new and changed records

As the last step, we need to insert the newer version of updated records along with all first-time inserts. These are indicated by ‘U’ and ‘I’, respectively, in the iud_op column in the temp_dim_customer table:

insert into rs_dim_blog.dim_customer (
  customer_sk, customer_id, first_name, 
  last_name, employer_name, email_id, 
  city, country, rec_eff_dt, rec_exp_dt
) 
select 
  customer_sk, 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  rec_eff_dt, 
  rec_exp_dt 
from 
  temp_dim_customer 
where 
  iud_operation in ('I', 'U');

Depending on the SQL client setting, you might want to run a commit transaction; command to verify that the preceding changes are persisted successfully in Amazon Redshift.

Check the final output

You can run the following query and see that the dim_customer table now contains both the initial data records plus the incremental data records, capturing multiple versions for those customer_id values that got changed as part of incremental data loading. The output also indicates that each record has been populated with appropriate values in rec_eff_dt and rec_exp_dt corresponding to the record validity period.

select 
  * 
from 
  rs_dim_blog.dim_customer 
order by 
  customer_id, 
  customer_sk;

For the sample data files provided in this article, the preceding query returns the following records. If you’re using the sample data files provided in this post, note that the values in customer_sk may not match with what is shown in the following table.

In this post, we only show the important SQL statements; the complete SQL code is available in load_scd2_sample_dim_customer.sql.

Clean up

If you no longer need the resources you created, you can delete them to prevent incurring additional charges.

Conclusion

In this post, you learned how to simplify data loading into Type-2 SCD tables in Amazon Redshift, covering both initial data loading and incremental data loading. The approach deals with multiple source tables populating a target dimension table, capturing the latest version of source records as of each run.

Refer to Amazon Redshift data loading best practices for further materials and additional best practices, and see Updating and inserting new data for instructions to implement updates and inserts.


About the Author

Vaidy Kalpathy is a Senior Data Lab Solution Architect at AWS, where he helps customers modernize their data platform and defines end to end data strategy including data ingestion, transformation, security, visualization. He is passionate about working backwards from business use cases, creating scalable and custom fit architectures to help customers innovate using data analytics services on AWS.

Build an end-to-end change data capture with Amazon MSK Connect and AWS Glue Schema Registry

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

{ 
    “Database Name”: “sampledatabase”, 
    “Table Name”: “movies”, 
    “Fields”: [
         { 
            “name”: “movie_id”, 
            “type”: “INTEGER” 
         },
         { 
            “name”: “title”, 
            “type”: “STRING” 
         },
         { 
            “name”: “release_year”,
            “type”: “INTEGER” 
         }
     ] 
}

Prerequisites

Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.
name=<Connector-name>
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=<DBHOST>
database.port=3306
database.user=<DBUSER>
database.password=<DBPASSWORD>
database.server.id=42
database.server.name=db1
table.whitelist=sampledatabase.movies
database.history.kafka.bootstrap.servers=<MSK-BOOTSTRAP>
database.history.kafka.topic=dbhistory.demo1
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=<REGION>
value.converter.region=<REGION>
key.converter.registry.name=msk-connect-blog-keys
value.converter.registry.name=msk-connect-blog-values
key.converter.compatibility=FORWARD
value.converter.compatibility=FORWARD
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,source.ts_ms
tasks.max=1

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
name=<CONNERCOR-NAME>
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.bucket.name=<BUCKET-NAME>
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
s3.region=<REGION>
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=10
tasks.max=1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.region=<REGION>
value.converter.region=<REGION>
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
key.converter.compatibility=NONE
store.kafka.keys=false
schema.compatibility=NONE
topics=db1.sampledatabase.movies
value.converter.registry.name=msk-connect-blog-values
key.converter.registry.name=msk-connect-blog-keys
store.kafka.headers=false
  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql
  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql
  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "COUNTRY",
      "type": "string"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}
  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.


About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.

Prerequisites

To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:

s3://iceberg-curated-blog-data

Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
{
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demo.io-impl":"org.apache.iceberg.aws.s3.S3FileIO"
}
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
UPDATE DELETE SELECT UPDATE DELETE SELECT
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.


About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Build a semantic search engine for tabular columns with Transformers and Amazon OpenSearch Service

Post Syndicated from Kachi Odoemene original https://aws.amazon.com/blogs/big-data/build-a-semantic-search-engine-for-tabular-columns-with-transformers-and-amazon-opensearch-service/

Finding similar columns in a data lake has important applications in data cleaning and annotation, schema matching, data discovery, and analytics across multiple data sources. The inability to accurately find and analyze data from disparate sources represents a potential efficiency killer for everyone from data scientists, medical researchers, academics, to financial and government analysts.

Conventional solutions involve lexical keyword search or regular expression matching, which are susceptible to data quality issues such as absent column names or different column naming conventions across diverse datasets (for example, zip_code, zcode, postalcode).

In this post, we demonstrate a solution for searching for similar columns based on column name, column content, or both. The solution uses approximate nearest neighbors algorithms available in Amazon OpenSearch Service to search for semantically similar columns. To facilitate the search, we create features representations (embeddings) for individual columns in the data lake using pre-trained Transformer models from the sentence-transformers library in Amazon SageMaker. Finally, to interact with and visualize results from our solution, we build an interactive Streamlit web application running on AWS Fargate.

We include a code tutorial for you to deploy the resources to run the solution on sample data or your own data.

Solution overview

The following architecture diagram illustrates the two-stage workflow for finding semantically similar columns. The first stage runs an AWS Step Functions workflow that creates embeddings from tabular columns and builds the OpenSearch Service search index. The second stage, or the online inference stage, runs a Streamlit application through Fargate. The web application collects input search queries and retrieves from the OpenSearch Service index the approximate k-most-similar columns to the query.

Solution architecture

Figure 1. Solution architecture

The automated workflow proceeds in the following steps:

  1. The user uploads tabular datasets into an Amazon Simple Storage Service (Amazon S3) bucket, which invokes an AWS Lambda function that initiates the Step Functions workflow.
  2. The workflow begins with an AWS Glue job that converts the CSV files into Apache Parquet data format.
  3. A SageMaker Processing job creates embeddings for each column using pre-trained models or custom column embedding models. The SageMaker Processing job saves the column embeddings for each table in Amazon S3.
  4. A Lambda function creates the OpenSearch Service domain and cluster to index the column embeddings produced in the previous step.
  5. Finally, an interactive Streamlit web application is deployed with Fargate. The web application provides an interface for the user to input queries to search the OpenSearch Service domain for similar columns.

You can download the code tutorial from GitHub to try this solution on sample data or your own data. Instructions on the how to deploy the required resources for this tutorial are available on Github.

Prerequistes

To implement this solution, you need the following:

  • An AWS account.
  • Basic familiarity with AWS services such as the AWS Cloud Development Kit (AWS CDK), Lambda, OpenSearch Service, and SageMaker Processing.
  • A tabular dataset to create the search index. You can bring your own tabular data or download the sample datasets on GitHub.

Build a search index

The first stage builds the column search engine index. The following figure illustrates the Step Functions workflow that runs this stage.

Step functions workflow

Figure 2 – Step functions workflow – multiple embedding models

Datasets

In this post, we build a search index to include over 400 columns from over 25 tabular datasets. The datasets originate from the following public sources:

For the the full list of the tables included in the index, see the code tutorial on GitHub.

You can bring your own tabular dataset to augment the sample data or build your own search index. We include two Lambda functions that initiate the Step Functions workflow to build the search index for individual CSV files or a batch of CSV files, respectively.

Transform CSV to Parquet

Raw CSV files are converted to Parquet data format with AWS Glue. Parquet is a column-oriented format file format preferred in big data analytics that provides efficient compression and encoding. In our experiments, the Parquet data format offered significant reduction in storage size compared to raw CSV files. We also used Parquet as a common data format to convert other data formats (for example JSON and NDJSON) because it supports advanced nested data structures.

Create tabular column embeddings

To extract embeddings for individual table columns in the sample tabular datasets in this post, we use the following pre-trained models from the sentence-transformers library. For additional models, see Pretrained Models.

Model name Dimension Size (MB)
all-MiniLM-L6-v2 384 80
all-distilroberta-v1 768 290
average_word_embeddings_glove.6B.300d 300 420

The SageMaker Processing job runs create_embeddings.py(code) for a single model. For extracting embeddings from multiple models, the workflow runs parallel SageMaker Processing jobs as shown in the Step Functions workflow. We use the model to create two sets of embeddings:

  • column_name_embeddings – Embeddings of column names (headers)
  • column_content_embeddings – Average embedding of all the rows in the column

For more information about the column embedding process, see the code tutorial on GitHub.

An alternative to the SageMaker Processing step is to create a SageMaker batch transform to get column embeddings on large datasets. This would require deploying the model to a SageMaker endpoint. For more information, see Use Batch Transform.

Index embeddings with OpenSearch Service

In the final step of this stage, a Lambda function adds the column embeddings to a OpenSearch Service approximate k-Nearest-Neighbor (kNN) search index. Each model is assigned its own search index. For more information about the approximate kNN search index parameters, see k-NN.

Online inference and semantic search with a web app

The second stage of the workflow runs a Streamlit web application where you can provide inputs and search for semantically similar columns indexed in OpenSearch Service. The application layer uses an Application Load Balancer, Fargate, and Lambda. The application infrastructure is automatically deployed as part of the solution.

The application allows you to provide an input and search for semantically similar column names, column content, or both. Additionally, you can select the embedding model and number of nearest neighbors to return from the search. The application receives inputs, embeds the input with the specified model, and uses kNN search in OpenSearch Service to search indexed column embeddings and find the most similar columns to the given input. The search results displayed include the table names, column names, and similarity scores for the columns identified, as well as the locations of the data in Amazon S3 for further exploration.

The following figure shows an example of the web application. In this example, we searched for columns in our data lake that have similar Column Names (payload type) to district (payload). The application used all-MiniLM-L6-v2 as the embedding model and returned 10 (k) nearest neighbors from our OpenSearch Service index.

The application returned transit_district, city, borough, and location as the four most similar columns based on the data indexed in OpenSearch Service. This example demonstrates the ability of the search approach to identify semantically similar columns across datasets.

Web application user interface

Figure 3: Web application user interface

Clean up

To delete the resources created by the AWS CDK in this tutorial, run the following command:

cdk destroy --all

Conclusion

In this post, we presented an end-to-end workflow for building a semantic search engine for tabular columns.

Get started today on your own data with our code tutorial available on GitHub. If you’d like help accelerating your use of ML in your products and processes, please contact the Amazon Machine Learning Solutions Lab.


About the Authors

Kachi Odoemene is an Applied Scientist at AWS AI. He builds AI/ML solutions to solve business problems for AWS customers.

Taylor McNally is a Deep Learning Architect at Amazon Machine Learning Solutions Lab. He helps customers from various industries build solutions leveraging AI/ML on AWS. He enjoys a good cup of coffee, the outdoors, and time with his family and energetic dog.

Austin Welch is a Data Scientist in the Amazon ML Solutions Lab. He develops custom deep learning models to help AWS public sector customers accelerate their AI and cloud adoption. In his spare time, he enjoys reading, traveling, and jiu-jitsu.

AWS Week in Review – February 27, 2023

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/aws-week-in-review-february-27-2023/

A couple days ago, I had the honor of doing a live stream on generative AI, discussing recent innovations and concepts behind the current generation of large language and vision models and how we got there. In today’s roundup of news and announcements, I will share some additional information—including an expanded partnership to make generative AI more accessible, a blog post about diffusion models, and our weekly Twitch show on Generative AI. Let’s dive right into it!

Last Week’s Launches
Here are some launches that got my attention during the previous week:

Integrated Private Wireless on AWS – The Integrated Private Wireless on AWS program is designed to provide enterprises with managed and validated private wireless offerings from leading communications service providers (CSPs). The offerings integrate CSPs’ private 5G and 4G LTE wireless networks with AWS services across AWS Regions, AWS Local Zones, AWS Outposts, and AWS Snow Family. For more details, read this Industries Blog post and check out this eBook. And, if you’re attending the Mobile World Congress Barcelona this week, stop by the AWS booth at the Upper Walkway, South Entrance, at the Fira Barcelona Gran Via, to learn more.

AWS Glue Crawlers – Now integrate with Lake Formation. AWS Glue Crawlers are used to discover datasets, extract schema information, and populate the AWS Glue Data Catalog. With this Glue Crawler and Lake Formation integration, you can configure a crawler to use Lake Formation permissions to access an S3 data store or a Data Catalog table with an underlying S3 location within the same AWS account or another AWS account. You can configure an existing Data Catalog table as a crawler’s target if the crawler and the Data Catalog table reside in the same account. To learn more, check out this Big Data Blog post.

AWS Glue Crawlers now support integration with AWS Lake Formation

Amazon SageMaker Model Monitor – You can now launch and configure Amazon SageMaker Model Monitor from the SageMaker Model Dashboard using a code-free point-and-click setup experience. SageMaker Model Dashboard gives you unified monitoring across all your models by providing insights into deviations from expected behavior, automated alerts, and troubleshooting to improve model performance. Model Monitor can detect drift in data quality, model quality, bias, and feature attribution and alert you to take remedial actions when such changes occur.

Amazon EKS – Now supports Kubernetes version 1.25. Kubernetes 1.25 introduced several new features and bug fixes, and you can now use Amazon EKS and Amazon EKS Distro to run Kubernetes version 1.25. You can create new 1.25 clusters or upgrade your existing clusters to 1.25 using the Amazon EKS console, the eksctl command line interface, or through an infrastructure-as-code tool. To learn more about this release named “Combiner,” check out this Containers Blog post.

Amazon Detective – New self-paced workshop available. You can now learn to use Amazon Detective with a new self-paced workshop in AWS Workshop Studio. AWS Workshop Studio is a collection of self-paced tutorials designed to teach practical skills and techniques to solve business problems. The Amazon Detective workshop is designed to teach you how to use the primary features of Detective through a series of interactive modules that cover topics such as security alert triage, security incident investigation, and threat hunting. Get started with the Amazon Detective Workshop.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Here are some additional news items and blog posts that you may find interesting:

🤗❤☁ AWS and Hugging Face collaborate to make generative AI more accessible and cost-efficient – This previous week, we announced an expanded collaboration between AWS and Hugging Face to accelerate the training, fine-tuning, and deployment of large language and vision models used to create generative AI applications. Generative AI applications can perform a variety of tasks, including text summarization, answering questions, code generation, image creation, and writing essays and articles. For more details, read this Machine Learning Blog post.

If you are interested in generative AI, I also recommend reading this blog post on how to Fine-tune text-to-image Stable Diffusion models with Amazon SageMaker JumpStart. Stable Diffusion is a deep learning model that allows you to generate realistic, high-quality images and stunning art in just a few seconds. This blog post discusses how to make design choices, including dataset quality, size of training dataset, choice of hyperparameter values, and applicability to multiple datasets.

AWS open-source news and updates – My colleague Ricardo writes this weekly open-source newsletter in which he highlights new open-source projects, tools, and demos from the AWS Community. Read edition #146 here.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

Build On AWS - Generative AI#BuildOn Generative AI – Join our weekly live Build On Generative AI Twitch show. Every Monday morning, 9:00 US PT, my colleagues Emily and Darko take a look at aspects of generative AI. They host developers, scientists, startup founders, and AI leaders and discuss how to build generative AI applications on AWS.

In today’s episode, my colleague Chris walked us through an end-to-end ML pipeline from data ingestion to fine-tuning and deployment of generative AI models. You can watch the video here.

AWS Pi Day 2023 SmallAWS Pi Day – Join me on March 14 for the third annual AWS Pi Day live, virtual event hosted on the AWS On Air channel on Twitch as we celebrate the 17th birthday of Amazon S3 and the cloud.

We will discuss the latest innovations across AWS Data services, from storage to analytics and AI/ML. If you are curious about how AI can transform your business, register here and join my session.

AWS Innovate Data and AI/ML edition – AWS Innovate is a free online event to learn the latest from AWS experts and get step-by-step guidance on using AI/ML to drive fast, efficient, and measurable results. Register now for EMEA (March 9) and the Americas (March 14).

You can browse all upcoming AWS-led in-person, virtual events and developer focused events such as Community Days.

That’s all for this week. Check back next Monday for another Week in Review!

— Antje

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Build a real-time GDPR-aligned Apache Iceberg data lake

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-real-time-gdpr-aligned-apache-iceberg-data-lake/

Data lakes are a popular choice for today’s organizations to store their data around their business activities. As a best practice of a data lake design, data should be immutable once stored. But regulations such as the General Data Protection Regulation (GDPR) have created obligations for data operators who must be able to erase or update personal data from their data lake when requested.

A data lake built on AWS uses Amazon Simple Storage Service (Amazon S3) as its primary storage environment. When a customer asks to erase or update private data, the data lake operator needs to find the required objects in Amazon S3 that contain the required data and take steps to erase or update that data. This activity can be a complex process for the following reasons:

  • Data lakes may contain many S3 objects (each may contain multiple rows), and often it’s difficult to find the object containing the exact data that needs to be erased or personally identifiable information (PII) to be updated as per the request
  • By nature, S3 objects are immutable and therefore applying direct row-based transactions like DELETE or UPDATE isn’t possible

To handle these situations, a transactional feature on S3 objects is required, and frameworks such as Apache Hudi or Apache Iceberg provide you the transactional feature for upserts in Amazon S3.

AWS contributed the Apache Iceberg integration with the AWS Glue Data Catalog, which enables you to use open-source data computation engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg, enabling transaction queries on S3 objects.

In this post, we show you how to stream real-time data to an Iceberg table in Amazon S3 using AWS Glue streaming and perform transactions using Amazon Athena for deletes and updates. We use a serverless mechanism for this implementation, which requires minimum operational overhead to manage and fine-tune various configuration parameters, and enables you to extend your use case to ACID operations beyond the GDPR.

Solution overview

We used the Amazon Kinesis Data Generator (KDG) to produce synthetic streaming data in Amazon Kinesis Data Streams and then processed the streaming input data using AWS Glue streaming to store the data in Amazon S3 in Iceberg table format. As part of the customer’s request, we ran delete and update statements using Athena with Iceberg support.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  1. Streaming data is generated in JSON format using the KDG template and inserted into Kinesis Data Streams.
  2. An AWS Glue streaming job is connected to Kinesis Data Streams to process the data using the Iceberg connector.
  3. The streaming job output is stored in Amazon S3 in Iceberg table format.
  4. Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in Iceberg format.
  5. Athena interacts with the Data Catalog tables in Iceberg format for transactional queries required for GDPR.

The codebase required for this post is available in the GitHub repository.

Prerequisites

Before starting the implementation, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy your solution resources:

  1. After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:
  2. For Stack name, enter a name.
  3. For Username, enter the user name for the KDG.
  4. For Password, enter the password for the KDG (this must be at least six alphanumeric characters, and contain at least one number).
  5. For IAMGlueStreamingJobRoleName, enter a name for the IAM role used for the AWS Glue streaming job.
  6. Choose Next and create your stack.

This CloudFormation template configures the following resources in your account:

  • An S3 bucket named streamingicebergdemo-XX (note that the XX part is a random unique number to make the S3 bucket name unique)
  • An IAM policy and role
  • The KDG URL used for creating synthetic data
  1. After you complete the setup, go to the Outputs tab of the CloudFormation stack to get the S3 bucket name, AWS Glue job execution role (as per your input), and KDG URL.
  2. Before proceeding with the demo, create a folder named custdata under the created S3 bucket.

Create a Kinesis data stream

We use Kinesis Data Streams to create a serverless streaming data service that is built to handle millions of events with low latency. The following steps guide you on how to create the data stream in the us-east-1 Region:

  1. Log in to the AWS Management Console.
  2. Navigate to Kinesis console (make sure the Region is us-east-1).
  3. Select Kinesis Data Streams and choose Create data stream.
  4. For Data stream name, enter demo-data-stream.
  5. For this post, we select On-demand as the Kinesis data stream capacity mode.

On-demand mode works to eliminate the need for provisioning and managing the capacity for streaming data. However, you can implement this solution with Kinesis Data Streams in provisioned mode as well.

  1. Choose Create data stream.
  2. Wait for successful creation of demo-data-stream and for it to be in Active status.

Set up the Kinesis Data Generator

To create a sample streaming dataset, we use the KDG URL generated on the CloudFormation stack Outputs tab and log in with the credentials used in the parameters for the CloudFormation template. For this post, we use the following template to generate sample data in the demo-data-stream Kinesis data stream.

  1. Log in to the KDG URL with the user name and password you supplied during stack creation.
  2. Change the Region to us-east-1.
  3. Select the Kinesis data stream demo-data-stream.
  4. For Records per second, choose Constant and enter 100 (it can be another number, depending on the rate of record creation).
  5. On the Template 1 tab, enter the KDG data generation template:
{
"year": "{{random.number({"min":2000,"max":2022})}}",
"month": "{{random.number({"min":1,"max":12})}}",
"day": "{{random.number({"min":1,"max":30})}}",
"hour": "{{random.number({"min":0,"max":24})}}",
"minute": "{{random.number({"min":0,"max":60})}}",
"customerid": {{random.number({"min":5023,"max":59874})}},
"firstname" : "{{name.firstName}}",
"lastname" : "{{name.lastName}}",
"dateofbirth" : "{{date.past(70)}}",
"city" : "{{address.city}}",
"buildingnumber" : {{random.number({"min":63,"max":947})}},
"streetaddress" : "{{address.streetAddress}}",
"state" : "{{address.state}}",
"zipcode" : "{{address.zipCode}}",
"country" : "{{address.country}}",
"countrycode" : "{{address.countryCode}}",
"phonenumber" : "{{phone.phoneNumber}}",
"productname" : "{{commerce.productName}}",
"transactionamount": {{random.number(
{
"min":10,
"max":150
}
)}}
}
  1. Choose Test template to test the sample records.
  2. When the testing is correct, choose Send data.

This will start sending 100 records per second in the Kinesis data stream. (To stop sending data, choose Stop Sending Data to Kinesis.)

Integrate Iceberg with AWS Glue

To add the Apache Iceberg Connector for AWS Glue, complete the following steps. The connector is free to use and supports AWS Glue 1.0, 2.0, and 3.0.

  1. On the AWS Glue console, choose AWS Glue Studio in the navigation pane.
  2. In the navigation pane, navigate to AWS Marketplace.
  3. Search for and choose Apache Iceberg Connector for AWS Glue.
  4. Choose Accept Terms and Continue to Subscribe.
  5. Choose Continue to Configuration.
  6. For Fulfillment option, choose your AWS Glue version.
  7. For Software version, choose the latest software version.
  8. Choose Continue to Launch.
  9. Under Usage Instructions, choose the link to activate the connector.
  10. Enter a name for the connection, then choose Create connection and activate the connector.
  11. Verify the new connector on the AWS Glue Studio Connectors.

Create the AWS Glue Data Catalog database

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location and schema of your data. You use the information in the Data Catalog to create and monitor your ETL jobs.

For this post, we create a Data Catalog database named icebergdemodb containing the metadata information of a table named customer, which will be queried through Athena.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose Add database.
  3. For Database name, enter icebergdemodb.

This creates an AWS Glue database for metadata storage.

Create a Data Catalog table in Iceberg format

In this step, we create a Data Catalog table in Iceberg table format.

  1. On the Athena console, create an Athena workgroup named demoworkgroup for SQL queries.
  2. Choose Athena engine version 3 for Query engine version.

For more information about Athena versions, refer to Changing Athena engine versions.

  1. Enter the S3 bucket location for Query result configuration under Additional configurations.
  2. Open the Athena query editor and choose demoworkgroup.
  3. Choose the database icebergdemodb.
  4. Enter and run the following DDL to create a table pointing to the Data Catalog database icerbergdemodb. Note that the TBLPROPERTIES section mentions ICEBERG as the table type and LOCATION points to the S3 folder (custdata) URI created in earlier steps. This DDL command is available on the GitHub repo.
CREATE TABLE icebergdemodb.customer(
year string,
month string,
day string,
hour string,
minute string,
customerid string,
firstname string,
lastname string,
dateofbirth string,
city string,
buildingnumber string,
streetaddress string,
state string,
zipcode string,
country string,
countrycode string,
phonenumber string,
productname string,
transactionamount int)
LOCATION '<S3 Location URI>'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912',
'optimize_rewrite_delete_file_threshold'='10'
);

After you run the command successfully, you can see the table customer in the Data Catalog.

Create an AWS Glue streaming job

In this section, we create the AWS Glue streaming job, which fetches the record from the Kinesis data stream using the Spark script editor.

  1. On the AWS Glue console, choose Jobs (new) in the navigation pane.
  2. For Create job¸ select Spark script editor.
  3. For Options¸ select Create a new script with boilerplate code.
  4. Choose Create.
  5. Enter the code available in the GitHub repo in the editor.

The sample code keeps appending data in the target location by fetching records from the Kinesis data stream.

  1. Choose the Job details tab in the query editor.
  2. For Name, enter Demo_Job.
  3. For IAM role¸ choose demojobrole.
  4. For Type, choose Spark Streaming.
  5. For Glue Version, choose Glue 3.0.
  6. For Language, choose Python 3.
  7. For Worker type, choose G 0.25X.
  8. Select Automatically scale the number of workers.
  9. For Maximum number of workers, enter 5.
  10. Under Advanced properties, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, choose the connector you created.
  12. For Job parameters, enter the following key pairs (provide your S3 bucket and account ID):
Key Value
--iceberg_job_catalog_warehouse s3://streamingicebergdemo-XX/custdata/
--output_path s3://streamingicebergdemo-XX
--kinesis_arn arn:aws:kinesis:us-east-1:<AWS Account ID>:stream/demo-data-stream
--user-jars-first True

  1. Choose Run to start the AWS Glue streaming job.
  2. To monitor the job, choose Monitoring in the navigation pane.
  3. Select Demo_Job and choose View run details to check the job run details and Amazon CloudWatch logs.

Run GDPR use cases on Athena

In this section, we demonstrate a few use cases that are relevant to GDPR alignment with the user data that’s stored in Iceberg format in the Amazon S3-based data lake as implemented in the previous steps. For this, let’s consider that the following requests are being initiated in the workflow to comply with the regulations:

  • Delete the records for the input customerid (for example, 59289)
  • Update phonenumber for the customerid (for example, 51842)

The IDs used in this example are samples only because they were created through the KDG template used earlier, which creates sample data. You can search for IDs in your implementation by querying through the Athena query editor. The steps remain the same.

Delete data by customer ID

Complete the following steps to fulfill the first use case:

  1. On the Athena console, and make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query using a customer ID and choose Run:
SELECT count(*)
FROM icebergdemodb.customer
WHERE customerid = '59289';

This query gives the count of records for the input customerid before delete.

  1. Enter the following query with the same customer ID and choose Run:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '59289') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN DELETE;

This query deletes the data for the input customerid as per the workflow generated.

  1. Test if there is data with the customer ID using a count query.

The count should be 0.

Update data by customer ID

Complete the following steps to test the second use case:

  1. On the Athena console, make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query with a customer ID and choose Run.
SELECT customerid, phonenumber
FROM icebergdemodb.customer
WHERE customerid = '51936';

This query gives the value for phonenumber before update.

  1. Run the following query to update the required columns:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '51936') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN UPDATE SET phonenumber = '000';

This query updates the data to a dummy value.

  1. Run the SELECT query to check the update.

You can see the data is updated correctly.

Vacuum table

A good practice is to run the VACUUM command periodically on the table because operations like INSERT, UPDATE, DELETE, and MERGE will take place on the Iceberg table. See the following code:

VACUUM icebergdemodb.customer;

Considerations

The following are a few considerations to keep in mind for this implementation:

Clean up

Complete the following steps to clean up the resources you created for this post:

    1. Delete the custdata folder in the S3 bucket.
    2. Delete the CloudFormation stack.
    3. Delete the Kinesis data stream.
    4. Delete the S3 bucket storing the data.
    5. Delete the AWS Glue job and Iceberg connector.
    6. Delete the AWS Glue Data Catalog database and table.
    7. Delete the Athena workgroup.
    8. Delete the IAM roles and policies.

Conclusion

This post explained how you can use the Iceberg table format on Athena to implement GDPR use cases like data deletion and data upserts as required, when streaming data is being generated and ingested through AWS Glue streaming jobs in Amazon S3.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the data operations that Iceberg supports. Refer to the Apache Iceberg documentation for details on various operations.


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Rajdip Chaudhuri is Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer.