Tag Archives: Amazon Athena

Amazon S3 Storage Lens adds performance metrics, support for billions of prefixes, and export to S3 Tables

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/amazon-s3-storage-lens-adds-performance-metrics-support-for-billions-of-prefixes-and-export-to-s3-tables/

Today, we’re announcing three new capabilities for Amazon S3 Storage Lens that give you deeper insights into your storage performance and usage patterns. With the addition of performance metrics, support for analyzing billions of prefixes, and direct export to Amazon S3 Tables, you have the tools you need to optimize application performance, reduce costs, and make data-driven decisions about your Amazon S3 storage strategy.

New performance metric categories
S3 Storage Lens now includes eight new performance metric categories that help identify and resolve performance constraints across your organization. These are available at organization, account, bucket, and prefix levels. For example, the service helps you identify small objects in a bucket or prefix that can  slow down application performance. This can be mitigated by batching small objects or using the Amazon S3 Express One Zone storage class for higher performance small object workloads.

To access the new performance metrics, you need to enable performance metrics in the S3 Storage Lens advanced tier when creating a new Storage Lens dashboard or editing an existing configuration.

Metric category Details Use case Mitigation
Read request size Distribution of read request sizes (GET) by day Identify dataset with small read request patterns that slow down performance Small request: Batch small objects or use Amazon S3 Express One Zone for high-performance small object workloads
Write request size Distribution of write request sizes (PUT, POST, COPY, and UploadPart) by day Identify dataset with small write request patterns that slow down performance Large request: Parallelize requests, use MPU or use AWS CRT
Storage size Distribution of object sizes Identify dataset with small small objects that slow down performance Small object sizes: Consider bundling small objects
Concurrent PUT 503 errors Number of 503s due to concurrent PUT operation on same object Identify prefixes with concurrent PUT throttling that slow down performance For single writer, modify retry behavior or use Amazon S3 Express One Zone. For multiple writers, use consensus mechanism or use Amazon S3 Express One Zone
Cross-Region data transfer Bytes transferred and requests sent across Region, in Region Identify potential performance and cost degradation due to cross-Region data access Co-locate compute with data in the same AWS Region
Unique objects accessed Number or percentage of unique objects accessed per day Identify datasets where small subset of objects are being frequently accessed. These can be moved to higher performance storage tier for better performance Consider moving active data to Amazon S3 Express One Zone or other caching solutions
FirstByteLatency (existing Amazon CloudWatch metric) Daily average of first byte latency metric The daily average per-request time from the complete request being received to when the response starts to be returned
TotalRequestLatency (existing Amazon CloudWatch metric) Daily average of Total Request Latency The daily average elapsed per request time from the first byte received to the last byte sent

How it works
On the Amazon S3 console I choose Create Storage Lens dashboard to create a new dashboard. You can also edit an existing dashboard configuration. I then configure general settings such as providing a Dashboard name, Status, and the optional Tags. Then, I choose Next.


Next, I define the scope of the dashboard by selecting Include all Regions and Include all buckets and specifying the Regions and buckets to be included.


I opt in to the Advanced tier in the Storage Lens dashboard configuration, select Performance metrics, then choose Next.


Next, I select Prefix aggregation as an additional metrics aggregation, then leave the rest of the information as default before I choose Next.


I select the Default metrics report, then General purpose bucket as the bucket type, and then select the Amazon S3 bucket in my AWS account as the Destination bucket. I leave the rest of the information as default, then select Next.


I review all the information before I choose Submit to finalize the process.


After it’s enabled, I’ll receive daily performance metrics directly in the Storage Lens console dashboard. You can also choose to export report in CSV or Parquet format to any bucket in your account or publish to Amazon CloudWatch. The performance metrics are aggregated and published daily and will be available at multiple levels: organization, account, bucket, and prefix. In this dropdown menu, I choose the % concurrent PUT 503 error for the Metric, Last 30 days for the Date range, and 10 for the Top N buckets.


The Concurrent PUT 503 error count metric tracks the number of 503 errors generated by simultaneous PUT operations to the same object. Throttling errors can degrade application performance. For a single writer, modify retry behavior or use higher performance storage tier such as Amazon S3 Express One Zone to mitigate concurrent PUT 503 errors. For multiple writers scenario, use a consensus mechanism to avoid concurrent PUT 503 errors or use higher performance storage tier such as Amazon S3 Express One Zone.

Complete analytics for all prefixes in your S3 buckets
S3 Storage Lens now supports analytics for all prefixes in your S3 buckets through a new Expanded prefixes metrics report. This capability removes previous limitations that restricted analysis to prefixes meeting a 1% size threshold and a maximum depth of 10 levels. You can now track up to billions of prefixes per bucket for analysis at the most granular prefix level, regardless of size or depth.

The Expanded prefixes metrics report includes all existing S3 Storage Lens metric categories: storage usage, activity metrics (requests and bytes transferred), data protection metrics, and detailed status code metrics.

How to get started
I follow the same steps outlined in the How it works section to create or update the Storage Lens dashboard. In Step 4 on the console, where you select export options, you can select the new Expanded prefixes metrics report. Thereafter, I can export the expanded prefixes metrics report in CSV or Parquet format to any general purpose bucket in my account for efficient querying of my Storage Lens data.


Good to know
This enhancement addresses scenarios where organizations need granular visibility across their entire prefix structure. For example, you can identify prefixes with incomplete multipart uploads to reduce costs, track compliance across your entire prefix structure for encryption and replication requirements, and detect performance issues at the most granular level.

Export S3 Storage Lens metrics to S3 Tables
S3 Storage Lens metrics can now be automatically exported to S3 Tables, a fully managed feature on AWS with built-in Apache Iceberg support. This integration provides daily automatic delivery of metrics to AWS managed S3 Tables for immediate querying without requiring additional processing infrastructure.

How to get started
I start by following the process outlined in Step 5 on the console, where I choose the export destination. This time, I choose Expanded prefixes metrics report. In addition to General purpose bucket, I choose Table bucket.

The new Storage Lens metrics are exported to new tables in an AWS managed bucket aws-s3.


I select the expanded_prefixes_activity_metrics table to view API usage metrics for expanded prefix reports.


I can preview the table on the Amazon S3 console or use Amazon Athena to query the table.


Good to know
S3 Tables integration with S3 Storage Lens simplifies metric analysis using familiar SQL tools and AWS analytics services such as Amazon Athena, Amazon QuickSight, Amazon EMR, and Amazon Redshift, without requiring a data pipeline. The metrics are automatically organized for optimal querying, with custom retention and encryption options to suit your needs.

This integration enables cross-account and cross-Region analysis, custom dashboard creation, and data correlation with other AWS services. For example, you can combine Storage Lens metrics with S3 Metadata to analyze prefix-level activity patterns and identify objects in prefixes with cold data that are eligible for transition to lower-cost storage tiers.

For your agentic AI workflows, you can use natural language to query S3 Storage Lens metrics in S3 Tables with the S3 Tables MCP Server. Agents can ask questions such as ‘which buckets grew the most last month?’ or ‘show me storage costs by storage class’ and get instant insights from your observability data.

Now available
All three enhancements are available in all AWS Regions where S3 Storage Lens is currently offered (except the China Regions and AWS GovCloud (US)).

These features are included in the Amazon S3 Storage Lens Advanced tier at no additional charge beyond standard advanced tier pricing. For the S3 Tables export, you pay only for S3 Tables storage, maintenance, and queries. There is no additional charge for the export functionality itself.

To learn more about Amazon S3 Storage Lens performance metrics, support for billions of prefixes, and export to S3 Tables, refer to the Amazon S3 user guide. For pricing details, visit the Amazon S3 pricing page.

Veliswa Boya.

Amazon FSx for NetApp ONTAP now integrates with Amazon S3 for seamless data access

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/amazon-fsx-for-netapp-ontap-now-integrates-with-amazon-s3-for-seamless-data-access/

Today, we’re announcing the ability to access your data in Amazon FSx for NetApp ONTAP file systems using Amazon Simple Storage Service (Amazon S3). With this capability, you can use your enterprise file data to augment generative AI applications with Amazon Bedrock Knowledge Bases for Retrieval Augmented Generation (RAG), train machine learning (ML) models with Amazon SageMaker, generate insights with Amazon S3 integrated third-party services, use comprehensive research capabilities in AI-powered business intelligence (BI) tools such as Amazon Quick Suite, and run analyses using Amazon S3 based cloud-native applications, all while your file data continues to reside in your FSx for NetApp ONTAP file system.

Amazon FSx for NetApp ONTAP is the first and fully AWS managed NetApp ONTAP file system in the cloud to migrate on-premises applications that rely on NetApp ONTAP or other network-attached storage (NAS) appliances to AWS without having to change how you manage your data. FSx for NetApp ONTAP provides the popular capabilities, high performance, and data management APIs of ONTAP file systems with the added benefits of the AWS Cloud, such as simplified management, on-demand scaling, and seamless integration with other AWS services.

Over the years, AWS has developed a broad range of industry-leading AI, ML, and analytics services and applications that work with data in Amazon S3 that organizations use to innovate faster, discover new insights, and make even better data-driven decisions. However, some organizations want to use these services with their enterprise file data stored in NetApp ONTAP or other NAS appliances.

How to get started
You can create and attach an S3 Access Point to your FSx for ONTAP file system using the Amazon FSx console, the AWS Command Line Interface (AWS CLI), or the AWS SDK.

I have an existing FSx for ONTAP file system demo-create-s3access which I created by following the steps in the Creating file systems in the FSx for ONTAP documentation. Using the Amazon FSx console I now choose the file system ID fs-0c45b011a7f071d70 to access the full details of the file system.

I’ll attach the access point to the volume of the file system. I choose the volume vol1 and then select Create S3 Access Point from the Actions dropdown menu.


I enter details such as the access point name, the type of file system user identity and the network configuration, then choose Create s3 Access Point to finalize the process.


After it’s created, the access point my-s3-accesspoint is ready to allow access to the file data stored in my file system demo-create-s3access from Amazon S3. Amazon Access Points are S3 endpoints that can be attached to Amazon FSx volumes and used to perform Amazon S3 object operations.


I can now bring proprietary data stored in the file system demo-create-s3access to Amazon S3 for use in applications that work with Amazon S3 while my file data continues to reside in the FSx for NetApp ONTAP file system using the access point my-s3-accesspoint (this data remains accessible through the file protocols).

For the walkthrough in this post, I’ll integrate with Quick Suite.

Integrating decades of enterprise file data with the latest AI powered BI tools on AWS
In the Quick Suite Console, in the left navigation pane, I choose Connections, then select Integrations. Before you begin, make sure that you have the correct permissions to the Amazon S3 AWS resource. You can control the AWS resources that Quick Suite can access by following the Amazon Quick Suite user guide.


After I’ve selected the Amazon S3 integration I enter my Amazon S3 Access Point alias as the S3 bucket URL, leave the rest of the information as default, then choose Create and continue.


I finalize the process by providing the Name of the knowledge base, the Description, then choose Create.


After the knowledge base has been created it’s automatically synchronized, it’s now available for interaction.


I want to learn more about the AWS European Sovereign Cloud so I’ve updated the file system (accessed through the S3 Access Point my-s3-accesspoin-iyytkgz83djdjj7abn3u711supfgkuse1b-ext-s3alias) with the AWS whitepaper on this topic. In the chat in Amazon Quick Suite. I start asking the first question “do we have any documentation on the europe sovereignty cloud?“. To answer my question, the chat agent accesses and analyzes various types of data sources I have permission to use, including uploaded files in my current conversation, spaces I have access to, knowledge bases from my integrations, and more.

When I verify the source, I see that the document I uploaded to my file system is listed as one of the sources.

Other use cases of Amazon S3 Access Points for Amazon FSx for NetApp ONTAP
Earlier, we looked at use cases such as connecting an organization’s proprietary file data to Amazon Quick Suite for advanced business intelligence. Additionally, Amazon S3 Access Points for Amazon FSx for NetApp ONTAP can be used to seamlessly integrate enterprise file data with comprehensive analytics services, such as Amazon Athena for serverless SQL queries or AWS Glue for ETL processing, to name a few.

Amazon S3 Access Points for Amazon FSx for NetApp ONTAP are also suitable for data access from serverless compute workloads that are cloud-native with containerized microservices that require flexible access to shared enterprise datasets, such as configuration files, reference data, content libraries, model artifacts, and application assets.

Now available
You can get started today using the Amazon FSx console, AWS CLI, or AWS SDK to attach Amazon S3 Access Points to your Amazon FSx for NetApp ONTAP file systems. The feature is available in the following AWS Regions: Africa (Cape Town), Asia Pacific (Hong Kong, Hyderabad, Jakarta, Melbourne, Mumbai, Osaka, Seoul, Singapore, Sydney, Tokyo), Canada (Central, Calgary), Europe (Frankfurt, Ireland, London, Milan, Paris, Spain, Stockholm, Zurich), Israel (Tel Aviv), Middle East (Bahrain, UAE), South America (Sao Paulo), US East (N. Virginia, Ohio), and US West (N. California Oregon). You’re billed by Amazon S3 for the requests and data transfer costs through your S3 Access Point, in addition to your standard Amazon FSx charges. Learn more on the Amazon FSx for NetApp ONTAP pricing page.

PS: Writing a blog post at AWS is always a team effort, even when you see only one name under the post title. In this case, I want to thank Luke Miller, for his expertise and generous help with technical guidance, which made this overview possible and comprehensive.

Veliswa Boya.

Orchestrating big data processing with AWS Step Functions Distributed Map

Post Syndicated from Biswanath Mukherjee original https://aws.amazon.com/blogs/compute/orchestrating-big-data-processing-with-aws-step-functions-distributed-map/

Developers seek to process and enrich semi-structured big data datasets with durably orchestrated network-based workflows. For example, during quarterly earnings season, finance organizations run thousands of market simulations simultaneously to provide timely insights for scenario planning or risk management—these workloads require coordination between raw datasets and on-premise servers to provide the latest market information.

AWS Step Functions is a visual workflow service capable of orchestrating over 14,000 API actions from over 220 AWS services to build distributed applications. Now, Step Functions Distributed Map streamlines big data dataset transformation by processing Amazon Athena data manifest and Parquet files directly. Using its Distributed Map feature, you can process large scale datasets by running concurrent iterations across data entries in parallel. In Distributed mode, the Map state processes the items in the dataset in iterations called child workflow executions. You can specify the number of child workflow executions that can run in parallel. Each child workflow execution has its own, separate execution history from that of the parent workflow. By default, Step Functions runs 10,000 parallel child workflow executions in parallel.

Distributed Map can process AWS Athena data manifest and Parquet files directly, eliminating the need for custom pre-processing. You also now have visibility into your Distributed Map usage with new Amazon CloudWatch metrics: Approximate Open Map Runs Count, Open Map Run Limit, and Approximate Map Runs Backlog Size.

In this post, you’ll learn how to use AWS Step Functions Distributed Map to process Athena data manifest and Parquet files through a step-by-step demonstration.

This post is part of a series of post about AWS Step Functions Distributed Map:

Use case: IoT sensor data processing

You’ll build a sample application that demonstrates processing IoT sensor data in Parquet format using Step Functions Distributed Map. These Parquet data files and a manifest file containing the list of the data files are exported from Athena. The data temperature, humidity, and lbattery level from different devices. The following table shows sample of sensor data:

Example IoT sensor data

Example IoT sensor data

Your objective is to use the Athena data manifest file, get the list of Parquet files, and iterate over the data in the files to detect anomalies and also stream the processed data through Amazon Kinesis Data Firehose to an Amazon S3 bucket for further analytics using Athena queries. Following is the criteria to detect anomaly:

  • Low battery conditions: less than 20%
  • Humidity anomalies: more than 95% or less than 5%
  • Temperature spikes: more than 35°C or less than -10°C

The following diagram represents the AWS Step Functions state machine:

Parquet files processing workflow

Parquet files processing workflow

  1. The Distributed Map runs an Athena query which generates Parquet data files and an Athena manifest file (csv). The manifest file contains the list of Parquet data files.
  2. Distributed Map processes these Parquet data files in parallel using child workflow executions. You can control the number of child workflow executions that can run in parallel using MaxConcurrency parameter. See Step Functions service quotas to learn more about concurrency limits.
  3. Each child workflow execution invokes an AWS Lambda function to process the respective Parquet file. The Lambda function processes individual sensor readings and detects anomalies according to the preceeding logic and returns a processed sensor data summary response.
  4. The child workflow sends the summary response record to Amazon Kinesis firehose stream which stores the results in a specified Amazon S3 results bucket.

The following Athena Start QueryExecution state runs an UNLOAD query to generate data files in Parquet format and a manifest file in CSV. The output will be stored in the S3 bucket specified in the UNLOAD query and the manifest file will be stored in the S3 bucket configured for the Athena workgroup.

{
  "QueryLanguage": "JSONata",
  "States": {
	   "Athena StartQueryExecution": {
	    "Type": "Task",
	        "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
	        "Arguments": {
		"QueryString": "UNLOAD (WRITE_YOUR_SELECT_QUERY_HERE) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')",
		"WorkGroup": "primary"
	},
	"Output": {
	"ManifestObjectKey": "{% $join([$states.result.QueryExecution.ResultConfiguration.OutputLocation, '-manifest.csv']) %}"
},
“Next”: “Next State”
…
}

The following ItemReader is configured to use a manifest type of “ATHENA_DATA” with “PARQUET” data input.

{
  "QueryLanguage": "JSONata",
  "States": {
    ...
    "Map": {
        ...
        "ItemReader": {
        	"Resource": "arn:aws:states:::s3:getObject",
   	"ReaderConfig": {
      		"ManifestType": "ATHENA_DATA",
      		"InputType": "PARQUET"
   	},
   	"Arguments": {
      		"Bucket":"Bucket": "{% $split($substringAfter($states.input.ManifestObjectKey, 's3://'), '/')[0] %}",,
      		"Key": "{% $substringAfter($substringAfter($states.input.ManifestObjectKey, 's3://'), '/') %}"
   	}
	    },
        ...
    }
}

Additional supported InputType options are CSV and JSONL. All objects referenced in a single manifest file must have the same InputType format. You specify the Amazon S3 bucket location of Athena manifest CSV file under Arguments.

The context object contains information in a JSON structure about your state machine and execution. Your workflows can reference the context object in a JSONata expression with $states.context.

Within a Map state, the Context object includes the following data:

"Map": {
   "Item": {
      "Index" : Number,
      "Key"   : "String", // Only valid for JSON objects
      "Value" : "String",
      "Source": "String"
   }
}

For each Map state iteration, Index contains the index number for the array item that is being currently processed, Key is available only when iterating over JSON objects, Value contains the array item being processed, and Source contains one of the following:

  • For state input, the value will be : STATE_DATA
  • For Amazon S3 LIST_OBJECTS_V2 with Transformation=NONE, the value will show the S3 URI for the bucket. For example: S3://amzn-s3-demo-bucket.
  • For all the other input types, the value will be the Amazon S3 URI. For example: S3://amzn-s3-demo-bucket/object-key.

Using this newly introduced Source field in the context object, you can connect the child executions with the source object.

Prerequisites

Set up the state machine and sample data

Run the following steps to deploy the Step Functions state machine.

  1. Clone the GitHub repository in a new folder and navigate to the project root folder.
    git clone https://github.com/aws-samples/sample-stepfunctions-athena-manifest-parquet-file-processor.git
    cd sample-stepfunctions-athena-manifest-parquet-file-processor

  2. Run the following command to install required Python dependencies for the Lambda function.
    python3 -m venv .venv
    source .venv/bin/activate
    python3 -m pip install -r requirements.txt

  3. Build the application.
    sam build

  4. Deploy the application
    sam deploy --guided

  5. Enter the following details:
    • Stack name: The CloudFormation stack name (for example, sfn-parquet-file-processor)
    • AWS Region: A supported AWS Region (for example, us-east-1)
    • Keep rest of the components to default values.

    Note the outputs from the AWS SAM deploy. You will use them in the subsequent steps.

  6. Run the following command to generate sample data in csv format and upload it to an S3 bucket. Replace <IoTDataBucketName> with the value from sam deploy ouptut.
    python3 scripts/generate_sample_data.py <IoTDataBucketName>

Create the Athena database and tables

Before you can run queries, you must set up an Athena database and table for your data.

  1. From Amazon Athena console, navigate to workgoups, select the workgroup named “primary”. Select Edit from Actions. In the query result configuration section, select the options as follows:
    1. Management of query results – select customer managed
    2. Location of query results – enter s3://<IoTDataBucketName>. Replace <IoTDataBucketName> with the value from sam deploy output.
    3. Choose Save to save the changes to the workgroup
  2. Select Query editor tab and run the following commands to create database and tables
    CREATE DATABASE `iotsensordata`;

  3. Create an Athena table in database iotsensordata that references the S3 bucket containing the raw sensor data. In this case it will be <IoTDataBucketName>. Replace <IoTDataBucketName> with the value from sam deploy output.
    CREATE EXTERNAL TABLE IF NOT EXISTS `iotsensordata`.`iotsensordata` 
    (`deviceid` string, 
    `timestamp` string,
    `temperature` double,
    `humidity` double,
    `batterylevel` double,
    `latitude` double,
    `longitude` double
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES ('field.delim' = ',')
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<IoTDataBucketName>/daily-data/'
    TBLPROPERTIES (
     'classification' = 'csv',
     'skip.header.line.count' = '1'
    );

  4. Create an Athena table in database iotsensordata that references the S3 bucket having the analytics results streamed from Kinesis Data Firehose. Replace <IoTAnalyticsResultsBucket> with value from sam deploy output. And replace <year> with the current year (e.g 2025).
    CREATE EXTERNAL TABLE IF NOT EXISTS iotsensordata.iotsensordataanalytics (deviceid string, analysisDate string, readingTimestamp string, readingsCount int, metrics struct< temperature: double, humidity: double, batterylevel: double, latitude: double, longitude: double >, anomalies array <string>, anomalyCount int, healthStatus string, timestamp string )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES ( 'ignore.malformed.json' = 'FALSE', 'dots.in.keys' = 'FALSE', 'case.insensitive' = 'TRUE'
    )
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<IoTAnalyticsResultsBucket>/<year>/'
    TBLPROPERTIES ('classification' = 'json', 'typeOfData'='file');

Start your state machine

Now that you have data ready and Athena set up for queries, start your state machine to retrieve and process the data.

  1. Run the following command to start execution of the Step Functions. Replace the <StateMachineArn> and <IoTDataBucketName> with the value from sam deploy output..
    aws stepfunctions start-execution \
      --state-machine-arn <StateMachineArn> \
      --input '{ "IoTDataBucketName": "<IoTDataBucketName>"}'

    The Step Functions state machine has the Athena StartQueryExecution state which has an UNLOAD query that generates the sensor data files in a parquet format and a manifest file in CSV format. The manifest will have 5 rows referencing the 5 parquet files. The state machine will process these 5 parquet files in one map run.

  2. Run the following command to get the details of the execution. Replace the executionArn from the previous command.
    aws stepfunctions describe-execution --execution-arn <executionArn>

  3. After you see the status SUCCEEDED, run the following command from Athena query editor to check the processed output from Kinesis Data Firehose that was streamed to S3 bucket referenced by the Athena table created in step 4 of the preceding section.
    SELECT * FROM iotsensordata.iotsensordataanalytics WHERE anomalycount = 1;

If any of the sensor data exceeds the thresholds, the healthstatus attribute will be set to “anomalies_detected”. The workflow produced a summary table of metadata which you can now query for reporting.

Output from Athena Query Editor

Review workflow performance

Using the following observability metrics, you can review key performance behavior of your data processing workflow.
The AWS/States namespace includes the following new metrics for all Step Functions Map Runs.

  • OpenMapRunLimit: This is the maximum number of open Map Runs allowed in the AWS account. The default value is 1,000 runs and is a hard limit. For more information, see Quotas related to accounts.
  • ApproximateOpenMapRunCount: This metric tracks the approximate number of Map Runs currently in progress within an account. Configuring an alarm on this metric using the Maximum statistic with a threshold of 900 or higher can help you take proactive action before reaching the OpenMapRunLimit of 1,000. This metric enables operational teams to implement preventive measures, such as staggering new executions or optimizing workflow concurrency, to maintain system stability and prevent backlog accumulation.
  • ApproximateMapRunBacklogSize: This metric shows up when the ApproximateOpenMapRunCount has reached 1,000 and there are backlogged Map Runs waiting to be executed. Backlogged Map Runs wait at the MapRunStarted event until the total number of open Map Runs is less than the quota.

The following graph shows an example of these new metrics. Use the maximum statistic to visualize these metrics. ApproximateMapRunBacklogSize metrics appear after accounts start getting throttled on the OpenMapRunLimit limit. The OpenMapRun (orange line) is the account hard limit of 1,000 shown as a static line. The ApproximateOpenMapRunCount (violet line) is the current number of active OpenMap runs. The ApproximateMapRunBacklogSize (green line) indicates the map runs waiting in backlog to be processed. When the ApproximateOpenMapRunCount is lower than 1000 (OpenMapRun limit) there are no map runs in backlog. However, when the count reaches the OpenMapRun limit, the backlog of map runs starts to build up. After the active runs complete, the backlog will start to drain out and new runs will begin execution.

Graphed metrics from Amazon CloudWatch

Graphed metrics from Amazon CloudWatch

Clean up

To avoid costs, remove all resources created for this post once you’re done. From the Athena query editor, run the following commands:

DROP TABLE `iotsensordata`.`iotsensordata`;
DROP TABLE `iotsensordata`.`iotsensordataanalytics`;
DROP DATABASE `iotsensordata`;

Run the following commands from the AWS CLI after replacing the <placeholder> variable to delete the resources you deployed for this post’s solution:

aws s3 rm s3://<IoTDataBucketName> --recursive
aws s3 rm s3://<IoTAnalyticsResultsBucketName> --recursive
sam delete

Conclusion

With this update, Distributed Map now supports additional data inputs, so you can orchestrate large-scale analytics and ETL workflows. You can now process Amazon Athena data manifest and Parquet files directly, eliminating the need for custom pre-processing. You also now have visibility into your Distributed Map usage with the following metrics: Approximate Open Map Runs Count, Open Map Run Limit, and Approximate Map Runs Backlog Size.

New input sources for Distributed Map are available in all commercial AWS Regions where AWS Step Functions is available. For a complete list of AWS Regions where Step Functions is available, see the AWS Region Table. The improved observability of your Distributed Map usage with new metrics is available in all AWS Regions. To get started, you can use the Distributed Map mode today in the AWS Step Functions console. To learn more, visit the Step Functions developer guide.

For more serverless learning resources, visit Serverless Land.

How Twilio built a multi-engine query platform using Amazon Athena and open-source Presto

Post Syndicated from Amber Runnels original https://aws.amazon.com/blogs/big-data/how-twilio-built-a-multi-engine-query-platform-using-amazon-athena-and-open-source-presto/

Twilio is a customer engagement platform that powers real-time, personalized customer experiences for leading brands through APIs that democratize communications channels like voice, text, chat, and video.

At Twilio, we manage a 20 petabyte-scale Amazon Simple Storage Service (Amazon S3) data lake that serves the analytics needs of over 1,500 users, processing 2.5 million queries monthly, and scanning an average of 85 PB of data. To meet our growing demands for scalability, emerging technology support, and data mesh architecture adoption, we built Odin, a multi-engine query platform that provides an abstraction layer built on top of Presto Gateway.

In this post, we discuss how we designed and built Odin, combining Amazon Athena with open-source Presto to create a flexible, scalable data querying solution.

A growing need for a multi-engine platform

Our data platform has been built on Presto since its inception, but over the years as we expanded to support multiple business lines and diverse use cases, we began to encounter challenges related to scalability, operational overhead, and cost management. Maintaining the platform through frequent version upgrades also became difficult. These upgrades required significant time to evaluate backwards compatibility, integrate with our existing data ecosystem, and determine optimal configurations across releases.

The administrative burden of upgrades and our commitment to minimizing user disruption caused our Presto version to fall behind. This prevented us from accessing the latest features and optimizations available in later releases. The adoption of Apache Hudi for our transaction-dependent critical workloads created a new requirement which our existing Presto deployment version couldn’t support. We needed an up-to-date Presto or Trino compatible service to accommodate these use cases while still reducing the operational overhead of maintaining our own query infrastructure.

Building a comprehensive data platform required us to balance multiple competing requirements and business constraints. We needed a solution that could support diverse workload types, from interactive analytics to ETL batch processing, while providing the flexibility to optimize compute resources based on specific use cases. We also wanted to improve upon cost management and attribution in our shared multi-tenanted query platform. Additionally, we needed to ensure that adopting any new technology did not cause any disruption to our users and maintained backward compatibility with existing systems during the transition period.

Selecting Amazon Athena as our modern analytics engine

Our users relied on SQL for interactive analysis, and we wanted to preserve this experience and make use of our existing jobs and application code. This meant we needed a Presto-compatible analytics service to modernize our data platform.

Amazon Athena is a serverless interactive query service built on Presto and Trino that allows you to run queries using a familiar ANSI SQL interface. Athena appealed to us due to its compatibility with open-source Trino and its seamless upgrade experience. Athena helps to ease the burden of managing a large-scale query infrastructure, and with provisioned capacity, offers predictable and scalable pricing for our largest query workloads. Athena’s workgroups provided the query and cost management capabilities we needed to efficiently support diverse teams and workload patterns with minimal overhead.

The ability to blend on-demand and dedicated serverless capacity models allows us to optimize workload distribution for our requirements, achieving the flexibility and scalability needed in a managed query environment. To address latency-sensitive and predictive query workloads, we adopted provisioned capacity for its serverless capacity guarantee and workload concurrency control features. For queries that may be ad-hoc and more flexible in scheduling, we opted to use the cost-efficient multi-tenant on-demand model, which optimizes resource utilization through shared infrastructure. In parallel to migrating workloads to Athena, we also needed a way to support legacy workloads that use custom implementations of Presto features. This requirement drove us to abstract the underlying implementation, allowing us to present users with a unified interface. This would give us the flexibility key to future proof our infrastructure and use the most appropriate compute for the workload and use case.

The birth of Odin

The following diagram shows Twilio’s multi-engine query platform that incorporates both Amazon Athena and open-source Presto.

Comprehensive AWS analytics architecture featuring multiple BI tools, query engines, caching layer, and query history tracking

High Level Architecture of Odin’s Query Engines

Odin is a Presto-based gateway built on Zuul, an open-source L7 application gateway developed by Netflix. Zuul had already demonstrated its scalability at Twilio, having been successfully adopted by other internal teams. Since end users primarily connect to the platform via a JDBC connector using the Presto Driver (which operates through HTTP calls), Zuul’s specialization in HTTP call management made it an ideal technical choice for our needs.

Odin functions as a central hub for query processing, employing a pluggable design that accommodates various query frameworks for maximum extensibility and flexibility. To interact with the Odin platform users are initially directed to an Amazon Application Load Balancer that sits in front of the Odin instances running on Amazon EC2. The Odin instances handle the authentication, routing, and entire query workflow throughout the query’s lifetime. Amazon ElastiCache for Redis handles the query tracking for Athena and Amazon DynamoDB is responsible for the maintaining the query history. Both query engines, Amazon Athena and the Presto clusters running on Amazon EC2,are supported by the AWS Glue Data Catalog as the metastore repository and query data from our Amazon S3-based data lake.

Routing queries to multiple engines

We had a variety of use cases that were being served by this query platform and therefore we opted to use Amazon Athena as our primary query engine while continuing to route certain legacy workloads to our Presto clusters. Prior to our architectural redesign, we encountered operational challenges due to our end users being tightly bound to specific Presto clusters which led to inevitable disruptions during maintenance windows. Additionally, users frequently overloaded individual clusters with diverse workloads ranging from lightweight ad-hoc analytics to complex data warehousing queries and resource-intensive ETL processes. This prompted us to implement a more sophisticated routing solution, one that was use case focused and not tightly bound to the specific underlying compute.

To enable routing across multiple query engines within the same platform, we developed a query hint mechanism that allows users to specify their intended use case. Users append this hint to the JDBC string via the X-Presto-Extra-Credential header, which Odin’s logical routing layer then evaluates alongside multiple factors including user identity, query origin, and fallback planning. The system also assesses whether the target resource has sufficient capacity, if not, it reroutes the query to an alternative resource with available capacity. While users provide initial context through their hints, Odin makes the final routing decisions intelligently on the server side. This approach balances user input with centralized orchestration, ensuring consistent performance and resource availability.

For example, say a user might specify the following connection string when connecting to the Odin platform from a Tableau client:

jdbc:presto://odin.twilio.com:443/hive?SSL=true&extraCredentials=routing:athena

The connection string uses the extraCredentials header to signal execution on Athena, where Odin validates query submission details, including the submitting user and tool, before determining the appropriate Athena workgroup for initial routing. Since this Tableau data source and user qualify as “critical queries,” the system routes them to a workgroup backed by capacity reservations. However, if that workgroup has too many pending queries in the execution queue, Odin’s routing logic automatically redirects to alternative workgroups with greater available resources. When necessary, queries may ultimately route to workgroups running on on-demand capacity. Through this fallback logic, Odin provides built-in load balancing at the routing layer, ensuring optimal utilization across the underlying compute infrastructure.

Here is an example workflow of how our queries are routed to Athena workgroups:

Architecture diagram showing how queries from Looker and Tableau are evaluated and routed through Amazon Athena workgroups with failback mechanisms

Once a query has been submitted to a workgroup for execution, Odin will also log the routing decision in our tracking system based on Amazon ElastiCache for Redis so that Odin’s routing logic can maintain real-time awareness of queue depths across all Athena workgroups. Additionally, Odin uses Amazon EventBridge to integrate with Amazon Athena to keep track of a query state change and create event-based workflows. Our Redis-based query tracking system effectively handles edge cases, such as when a JDBC client terminates mid-query. Even during such unexpected interruptions, the platform consistently maintains and updates the accurate state of the query.

Query history

Following successful query routing to either an Athena workgroup or one of our open-source Presto clusters, Odin persists the query identifier and destination endpoint in a query history table in DynamoDB. This design utilizes a RESTful architecture where initial query submissions operate as POST requests, while subsequent status checks function as GET requests that utilize DynamoDB as the authoritative lookup mechanism to locate and poll the appropriate execution engine. By centralizing query execution records in DynamoDB rather than maintaining state on individual servers, we’ve created a truly stateless system where incoming requests can be handled by any Amazon EC2 instance hosting our Odin web service.

Lessons learned

The transition from open-source Presto to Athena required some adaptation time, due to subtle differences in how these query engines operate. Since our Odin framework was built on the Presto driver, we needed to modify our processing approach to ensure compatibility between both systems.

As we began to adopt Athena for more use cases, we noticed a difference in the record counts between Athena and the original Presto queries. We discovered this was due to open-source Presto returning results with every page containing a header column, whereas Athena results only contain the header column on the first page and subsequent pages containing records only. This difference meant that for a 60-page result set, Athena would return 59 fewer rows than open-source Presto. Once we identified this pagination behavior, we optimized Odin’s result handling logic to properly interpret and process Athena’s format, so that queries would return accurate results.

Due to the nature of using the Odin platform, most of our interactions with the Athena service are API driven so we make use of the ResultSet object with the GetQueryResults API to retrieve query execution data. Using this mechanism, the API returns the data as all VARCHAR data type, even for complex types such as row, map, or array. This created a challenge because Odin uses the Presto driver for query parsing, resulting in a type mismatch between the expected formats and actual returned data. To address this, we implemented a translation layer within the Odin framework that converts all data types to VARCHAR and handles any downstream implications of this conversion internally.

These technical adjustments, while initially challenging, highlighted the importance of carefully managing the subtle differences between different query execution engines when building a unified data platform.

Scale of Odin and looking ahead

The Odin platform serves over 1,500 users who execute approximately 80,000 queries daily, totaling 2.5 million queries per month. Odin also powers more than 5,000 Business Intelligence (BI) reports and dashboards for Tableau and Looker. The queries are executed across our multi-engine landscape of more than 30 workgroups in Athena based on both provisioned capacity and on-demand workgroups and 4 Presto clusters on running on EC2 instances with Auto Scaling enabled that run on average 180 instances each. As Twilio continues to experience rapid growth, our Odin platform has enabled us to mature our technology stacks by both upgrading existing compute resources and integrating new technologies. We can do all this without disrupting the experience for our end users. While Odin serves as our foundation, we’re excited to continue to expand this pluggable infrastructure. Our roadmap includes migrating our self-managed open-source Presto implementation to EMR Trino, introducing Apache Spark as a compute engine via Amazon EMR Serverless or AWS Glue jobs, and integrating generative AI capabilities to intelligently route queries across Odin’s various compute options.

Conclusion

In this post, we’ve shared how we built Odin, our unified multi-engine query platform. By combining AWS services like Amazon Athena, Amazon ElastiCache for Redis, and Amazon DynamoDB with our open-source technology stack, we created a transparent abstraction layer for users. This integration has resulted in a highly available and resilient platform environment that serves our query processing needs.

By embracing this multi-engine approach, not only did we solve our query infrastructure challenges but we also established a flexible foundation that will continue to evolve with our data needs, ensuring we can deliver powerful insights at scale regardless of how technology trends shift in the future.

To learn more and get started using Amazon Athena, please see the Athena User Guide.


About the authors

Aakash Pradeep

Aakash Pradeep

Aakash is a Senior Software Engineer at Adobe with over 15 years of experience across ingestion, compute, storage, and query platforms. Previously, at Twilio, he worked extensively on developing the Odin platform to serve his customers query infrastructure needs. Aakash is a PrestoCon speaker, holds multiple patents in real-time analytics, and is passionate about building high-performance distributed systems.

Venkatram Bondugula

Venkatram Bondugula

Venkatram is a seasoned backend engineer with over a decade of experience specializing in the design and development of scalable data platforms for big data and distributed systems. With a strong background in backend architecture and data engineering, he has built and optimized high-performance systems that power data-driven decision-making at scale.

Amber Runnels

Amber Runnels

Amber is a Senior Analytics Specialist Solutions Architect at AWS specializing in big data and distributed systems. She helps customers optimize workloads in the AWS data ecosystem to achieve a scalable, performant, and cost-effective architecture. Aside from technology, she is passionate about exploring the many places and cultures this world has to offer, reading novels, and building terrariums.

Visualize data lineage using Amazon SageMaker Catalog for Amazon EMR, AWS Glue, and Amazon Redshift

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/visualize-data-lineage-using-amazon-sagemaker-catalog-for-amazon-emr-aws-glue-and-amazon-redshift/

Amazon SageMaker offers a comprehensive hub that integrates data, analytics, and AI capabilities, providing a unified experience for users to access and work with their data. Through Amazon SageMaker Unified Studio, a single and unified environment, you can use a wide range of tools and features to support your data and AI development needs, including data processing, SQL analytics, model development, training, inference, and generative AI development. This offering is further enhanced by the integration of Amazon Q and Amazon SageMaker Catalog, which provide an embedded generative AI and governance experience, helping users work efficiently and effectively across the entire data and AI lifecycle, from data preparation to model deployment and monitoring.

With the SageMaker Catalog data lineage feature, you can visually track and understand the flow of your data across different systems and teams, gaining a complete picture of your data assets and how they’re connected. As an OpenLineage-compatible feature, it helps you trace data origins, track transformations, and view cross-organizational data consumption, giving you insights into cataloged assets, subscribers, and external activities. By capturing lineage events from OpenLineage-enabled systems or through APIs, you can gain a deeper understanding of your data’s journey, including activities within SageMaker Catalog and beyond, ultimately driving better data governance, quality, and collaboration across your organization.

Additionally, the SageMaker Catalog data lineage feature versions each event, so you can track changes, visualize historical lineage, and compare transformations over time. This provides valuable insights into data evolution, facilitating troubleshooting, auditing, and data integrity by showing exactly how data assets have evolved, and generates trust in data.

In this post, we discuss the visualization of data lineage in SageMaker Catalog and how capture lineage from different AWS analytics services such as AWS Glue, Amazon Redshift, and Amazon EMR Serverless automatically, and visualize it with SageMaker Unified Studio.

Solution overview

The generation of data lineage in SageMaker Catalog operates through an automated system that captures metadata and relationships between different data artifacts for AWS Glue, Amazon EMR, and Amazon Redshift. When data moves through various AWS services, SageMaker automatically tracks these movements, transformations, and dependencies, creating a detailed map of the data’s journey. This tracking includes information about data sources, transformations, processing steps, and final outputs, providing a complete audit trail of data movement and transformation.

The implementation of data lineage in SageMaker Catalog offers several key benefits:

  • Compliance and audit support – Organizations can demonstrate compliance with regulatory requirements by showing complete data provenance and transformation history
  • Impact analysis – Teams can assess the potential impact of changes to data sources or transformations by understanding dependencies and relationships in the data pipeline
  • Troubleshooting and debugging – When issues arise, the lineage system helps identify the root cause by showing the complete path of data transformation and processing
  • Data quality management – By tracking transformations and dependencies, organizations can better maintain data quality and understand how data quality issues might propagate through their systems

Lineage capture is automated using several tools in SageMaker Unified Studio. To learn more, refer to Data lineage support matrix.

In the following sections, we show you how to configure your resources and implement the solution. For this post, we create the solution resources in the us-west-2 AWS Region using an AWS CloudFormation template.

Prerequisites

Before getting started, make sure you have the following:

Configure SageMaker Unified Studio with AWS CloudFormation

The vpc-analytics-lineage-sus.yaml stack creates a VPC, subnet, security group, IAM roles, NAT gateway, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, S3 buckets, SageMaker Unified Studio domain, and SageMaker Unified Studio project. To create the solution resources, complete the following steps:

  1. Launch the stack vpc-analytics-lineage-sus using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.

    Parameters Sample value
    DatazoneS3Bucket s3://datazone-{account_id}/
    DomainName dz-studio
    EnvironmentName sm-unifiedstudio
    PrivateSubnet1CIDR 10.192.20.0/24
    PrivateSubnet2CIDR 10.192.21.0/24
    PrivateSubnet3CIDR 10.192.22.0/24
    ProjectName sidproject
    PublicSubnet1CIDR 10.192.10.0/24
    PublicSubnet2CIDR 10.192.11.0/24
    PublicSubnet3CIDR 10.192.12.0/24
    UsersList analyst
    VpcCIDR 10.192.0.0/16

The stack creation process can take approximately 20 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

Next, we prepare source data, setup the AWS Glue ETL Job, Amazon EMR Serverless Spark Job and Amazon Redshift Job to generate the lineage and capture lineage from Amazon SageMaker Unified Studio

Prepare data

The following is example data from our CSV files:

attendance.csv

EmployeeID,Date,ShiftStart,ShiftEnd,Absent,OvertimeHours
E1000,2024-01-01,2024-01-01 08:00:00,2024-01-01 16:22:00,False,3
E1001,2024-01-08,2024-01-08 08:00:00,2024-01-08 16:38:00,False,2
E1002,2024-01-23,2024-01-23 08:00:00,2024-01-23 16:24:00,False,3
E1003,2024-01-09,2024-01-09 10:00:00,2024-01-09 18:31:00,False,0
E1004,2024-01-15,2024-01-15 09:00:00,2024-01-15 17:48:00,False,1

employees.csv

EmployeeID,Name,Department,Role,HireDate,Salary,PerformanceRating,Shift,Location
E1000,Employee_0,Quality Control,Operator,2021-08-08,33002.0,1,Night,Plant C
E1001,Employee_1,Maintenance,Supervisor,2015-12-31,69813.76,5,Evening,Plant B
E1002,Employee_2,Production,Technician,2015-06-18,46753.32,1,Evening,Plant A
E1003,Employee_3,Admin,Supervisor,2020-10-13,52853.4,5,Night,Plant A
E1004,Employee_4,Quality Control,Manager,2023-09-21,55645.27,5,Evening,Plant A

Upload the sample data from attendance.csv and employees.csv to the S3 bucket specified in the previous CloudFormation stack (s3://datazone-{account_id}/csv/).

Ingest employee data in Amazon Relational Database Dervice (Amazon RDS) for MySQL table

On the CloudFormation console, open the stack vpc-analytics-lineage-sus and collect the Amazon RDS for MySQL database endpoint to use in the following commands to create a default employeedb database.

  1. Connect to Amazon EC2 instance with mysql package installation
  2. Run the following command to connect to the database
    >MySQL -u admin -h database-1.cuqd06l5efvw.us-west-2.rds.amazonaws.com -p

  3. Run the following command to create an employee table
    Use employeedb;
    
    CREATE TABLE employee (
      EmployeeID longtext,
      Name longtext,
      Department longtext,
      Role longtext,
      HireDate longtext,
      Salary longtext,
      PerformanceRating longtext,
      Shift longtext,
      Location longtext
    );

  4. Running the following command to insert rows.
    INSERT INTO employee (EmployeeID, Name, Department, Role, HireDate, Salary, PerformanceRating, Shift, Location) VALUES ('E1000', 'Employee_0', 'Quality Control', 'Operator', '2021-08-08', 33002.00, 1, 'Night', 'Plant C'), ('E1001', 'Employee_1', 'Maintenance', 'Supervisor', '2015-12-31', 69813.76, 5, 'Evening', 'Plant B'), ('E1002', 'Employee_2', 'Production', 'Technician', '2015-06-18', 46753.32, 1, 'Evening', 'Plant A'), ('E1003', 'Employee_3', 'Admin', 'Supervisor', '2020-10-13', 52853.40, 5, 'Night', 'Plant A'), ('E1004', 'Employee_4', 'Quality Control', 'Manager', '2023-09-21', 55645.27, 5, 'Evening', 'Plant A');

Capture lineage from AWS Glue ETL job and notebook

To demonstrate the lineage, we set up an AWS Glue extract, transform, and load (ETL) job to read the employee data from an Amazon RDS for MySQL table and the employee attendance data from Amazon S3, and join both datasets. Finally, we write the data to Amazon S3 and create the attendance_with_emp1 table in the AWS Glue Data Catalog.

Create and configure AWS Glue job for lineage generation

Complete the following steps to create your AWS Glue ETL job:

  1. On the AWS Glue console, create a new ETL job with AWS Glue version 5.0.
  2. Enable Generate lineage events and provide the domain ID (retrieve from the CloudFormation template output for DataZoneDomainid; it will have the format dzd_xxxxxxxx)
  3. Use the following code snippet in the AWS Glue ETL job script. Provide the S3 bucket (bucketname-{account_id}) used in the preceding CloudFormation stack.
    from pyspark.sql import SparkSession
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    import sys
    import logging
    
    
    spark = SparkSession.builder.appName("lineageglue").enableHiveSupport().getOrCreate()
     
    connection_details = glueContext.extract_jdbc_conf(connection_name="connectionname")
    
    employee_df = spark.read.format("jdbc").option("url", "jdbc:MySQL://dbhost:3306/database_name").option("dbtable", "employee").option("user", connection_details['user']).option("password", connection_details['password']).load()
    
    s3_paths = {
    'absent_data': 's3://bucketname-{account_id}/csv/attendance.csv'
    }
    absent_df = spark.read.csv(s3_paths['absent_data'], header=True, inferSchema=True)
    
    joined_df = employee_df.join(absent_df, on="EmployeeID", how="inner")
    
    joined_df.write.mode("overwrite").format("parquet").option("path", "s3://datazone-{account_id}/attendanceparquet/").saveAsTable("gluedbname.tablename")

  4. Choose Run to start the job.
  5. On the Runs tab, confirm the job ran without failure.
  6. After the job has executed successfully, navigate to the SageMaker Unified Studio domain.
  7. Choose Project and under Overview, choose Data Sources.
  8. Select the Data Catalog source (accountid-AwsDataCatalog-glue_db_suffix-default-datasource).
  9. On the Actions dropdown menu, choose Edit.
  10. Under Connection, enable Import data lineage.
  11. In the Data Selection section, under Table Selection Criteria, provide a table name or use * to generate lineage.
  12. Update the data source and choose Run to create an asset called attendance_with_emp1 in SageMaker Catalog.
  13. Navigate to Assets, choose the attendance_with_emp1 asset, and navigate to the LINEAGE section.

The following lineage diagram shows an AWS Glue job that integrates data from two sources: employee information stored in Amazon RDS for MySQL and employee absence records stored in Amazon S3. The AWS Glue job combines these datasets through a join operation, then creates a table in the Data Catalog and registers it as an asset in SageMaker Catalog, making the unified data available for further analysis or machine learning purposes.

Create and configure AWS Glue notebook for lineage generation

Complete the following steps to create the AWS Glue notebook:

  1. On the AWS Glue console, choose Author using an interactive code notebook.
  2. Under Options, choose Start fresh and choose Create notebook.
  3. In the notebook, use the following code to generate lineage.

    In the following code, we add the required Spark configuration to generate lineage and then read CSV data from Amazon S3 and write in Parquet format to the Data Catalog table. The Spark configuration includes the following parameters:

    • spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener – Registers the OpenLineage listener to capture Spark job execution events and metadata for lineage tracking
    • spark.openlineage.transport.type=amazon_datazone_api – Specifies Amazon DataZone as the destination service where the lineage data will be sent and stored
    • spark.openlineage.transport.domainId=dzd_xxxxxxx – Defines the unique identifier of your Amazon DataZone domain where the lineage data will be associated
    • spark.glue.accountId={account_id} – Specifies the AWS account ID where the AWS Glue job is running for proper resource identification and access
    • spark.openlineage.facets.custom_environment_variables – Lists the specific environment variables to capture in the lineage data for context about the AWS and AWS Glue environment
    • spark.glue.JOB_NAME=lineagenotebook – Sets a unique identifier name for the AWS Glue job that will appear in lineage tracking and logs

    See the following code:

    %%configure —name project.spark -f
    {
    "—conf":"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \
    --conf spark.openlineage.transport.type=amazon_datazone_api \
    --conf spark.openlineage.transport.domainId=dzd_xxxxxxxx \
    --conf spark.glue.accountId={account_id} \
    --conf spark.openlineage.facets.custom_environment_variables=[AWS_DEFAULT_REGION;GLUE_VERSION;GLUE_COMMAND_CRITERIA;GLUE_PYTHON_VERSION;] \
    --conf spark.glue.JOB_NAME=lineagenotebook"
    }
    
    from pyspark.sql import SparkSession
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    import sys
    import logging
    
    
    spark = SparkSession.builder.appName("lineagegluenotebook").enableHiveSupport().getOrCreate()
    
    s3_paths = {
    'absent_data': 's3://datazone-{account_id}/csv/attendance.csv'
    }
    absent_df = spark.read.csv(s3_paths['absent_data'], header=True, inferSchema=True)
    
    absent_df.write.mode("overwrite").format("parquet").option("path", "s3://datazone-{account_id}/attendanceparquet2/").saveAsTable("gluedbname.tablename")

  4. After the notebook has executed successfully, navigate to the SageMaker Unified Studio domain.
  5. Choose Project and under Overview, choose Data Sources.
  6. Choose the Data Catalog source ({account_id}-AwsDataCatalog-glue_db_suffix-default-datasource).
  7. Choose Run to create the asset attendance_with_empnote in SageMaker Catalog.
  8. Navigate to Assets, choose the attendance_with_empnote asset, and navigate to the LINEAGE section.

The following lineage diagram shows an AWS Glue job that reads data from the employee absence records stored in Amazon S3. The AWS Glue job transform CSV data into Parquet format, then creates a table in the Data Catalog and registers it as an asset in SageMaker Catalog.

Capture lineage from Amazon Redshift

To demonstrate the lineage, we are creating an employee table and an attendance table and join both datasets. Finally, we create a new table called employeewithabsent in Amazon Redshift. Complete the following steps to create and configure lineage for Amazon Redshift tables:

  1. In SageMaker Unified Studio, open your domain.
  2. Under Compute, choose Data warehouse.
  3. Open project.redshift and copy the endpoint name (redshift-serverless-workgroup-xxxxxxx).
  4. On the Amazon Redshift console, open the Query Editor v2, and connect to the Redshift Serverless workgroup with a secret. Use the AWS Secrets Manager option and choose the secret redshift-serverless-namespace-xxxxxxxx.
  5. Use the following code to create tables in Amazon Redshift and load data from Amazon S3 using the COPY command. Make sure the IAM role has GetObject permission on the S3 files attendance.csv and employees.csv.

    Create Redshift table absent

    CREATE TABLE public.absent (
        employeeid character varying(65535),
        date date,
        shiftstart timestamp without time zone ,
        shiftend timestamp without time zone,
        absent boolean,
        overtimehours integer
    );

    Load data into absent table.

    COPY absent
    FROM 's3://datazone-{account_id}/csv/attendance.csv' 
    IAM_ROLE 'arn:aws:iam::accountid:role/RedshiftAdmin'
    csv
    IGNOREHEADER 1;

    Create Redshift table employee

    CREATE TABLE public.employee (
        employeeid character varying(65535),
        name character varying(65535),
        department character varying(65535),
        role character varying(65535),
        hiredate date,
        salary double precision,
        performancerating integer,
        shift character varying(65535),
        location character varying(65535)
    );

    Load data into employee table.

    COPY employee
    FROM 's3://datazone-{account_id}/csv/employees.csv' 
    IAM_ROLE 'arn:aws:iam::account-id:role/RedshiftAdmin'
    csv
    IGNOREHEADER 1;

  6. After the tables are created and the data is loaded, perform the join between the tables and create a new table with a CTAS query:
    CREATE TABLE public.employeewithabsent AS
    SELECT 
      e.*,
      a.absent,
      a.overtimehours
    FROM public.employee e
    INNER JOIN public.absent a
    ON e.EmployeeID = a.EmployeeID;

  7. Navigate to the SageMaker Unified Studio domain.
  8. Choose Project and under Overview, choose Data Sources.
  9. Select the Amazon Redshift source (RedshiftServerless-default-redshift-datasource).
  10. On the Actions dropdown menu, choose Edit.
  11. Under Connection, Enable Import data lineage.
  12. In the Data Selection section, under Table Selection Criteria, provide a table name or use * to generate lineage.
  13. Update the data source and choose Run to create an asset called employeewithabsent in SageMaker Catalog.
  14. Navigate to Assets, choose the employeewithabsent asset, and navigate to the LINEAGE section.

The following lineage diagram shows joining two redshift tables and creating a new redshift table and registers it as an asset in SageMaker Catalog.

Capture lineage from EMR Serverless job

To demonstrate the lineage, we read employee data from an RDS for MySQL table and an attendance dataset from Amazon Redshift, and join both datasets. Finally, we write the data to Amazon S3 and create the attendance_with_employee table in the Data Catalog. Complete the following steps:

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. To create or manage EMR Serverless applications, you need the EMR Studio UI.
    1. If you already have an EMR Studio in the Region where you want to create an application, choose Manage applications to navigate to your EMR Studio, or select the EMR Studio that you want to use.
    2. If you don’t have an EMR Studio in the Region where you want to create an application, choose Get started and then choose Create and launch Studio. EMR Serverless creates an EMR Studio for you so you can create and manage applications.
  3. In the Create studio UI that opens in a new tab, enter the name, type, and release version for your application.
  4. Choose Create application.
  5. Create an EMR Spark serverless application with the following configuration:
    1. For Type, choose Spark.
    2. For Release version, choose emr-7.8.0.
    3. For Architecture, choose x86_64.
    4. For Application setup options, select Use custom settings.
    5. For Interactive endpoint, enable the endpoint for EMR Studio.
    6. For Application configuration, use the following configuration:
      [{
          "Classification": "iceberg-defaults",
          "Properties": {
              "iceberg.enabled": "true"
          }
      }]

  6. Choose Create and Start application.
  7. After application has started, submit the Spark application to generate lineage events. Copy the following script and upload it to the S3 bucket (s3://datazone-{account_id}/script/). Upload the MySQL-connector-java JAR file to the S3 bucket (s3://datazone-{account_id}/jars/) to read the data from MySQL.
    from pyspark.sql import SparkSession
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    import sys
    import logging
    
    
    spark = SparkSession.builder.appName("lineageglue").enableHiveSupport().getOrCreate()
    
    employee_df = spark.read.format("jdbc").option("driver","com.MySQL.cj.jdbc.Driver").option("url", "jdbc:MySQL://dbhostname:3306/databasename").option("dbtable", "employee").option("user", "admin").option("password", "xxxxxxx").load()
    
    absent_df = spark.read.format("jdbc").option("url", "jdbc:redshift://redshiftserverlessendpoint:5439/dev").option("dbtable", "public.absent").option("user", "admin").option("password", "xxxxxxxxxx").load()
    
    joined_df = employee_df.join(absent_df, on="EmployeeID", how="inner")
    
    joined_df.write.mode("overwrite").format("parquet").option("path", "s3://datazone-{account_id}/emrparquetnew/").saveAsTable("gluedname.tablename")

  8. After you upload the script, use the following command to submit the Spark application. Change the following parameters according to your environment details:
    1. application-id: Provide the Spark application ID you generated.
    2. execution-role-arn: Provide the EMR execution role.
    3. entryPoint: Provide the Spark script S3 path.
    4. domainID: Provide the domain ID (from the CloudFormation template output for DataZoneDomainid: dzd_xxxxxxxx).
    5. accountID: Provide your AWS account ID.
      aws emr-serverless start-job-run --application-id 00frv81tsqe0ok0l --execution-role-arn arn:aws:iam::{account_id}:role/service-role/AmazonEMR-ExecutionRole-1717662744320 --name "Spark-Lineage" --job-driver '{
              "sparkSubmit": {
                  "entryPoint": "s3://datazone-{account_id}/script/emrspark2.py",
                  "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=2 --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=/usr/share/aws/datazone-openlineage-spark/lib/DataZoneOpenLineageSpark-1.0.jar,s3://datazone-{account_id}/jars/MySQL-connector-java-8.0.20.jar --conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener --conf spark.openlineage.transport.type=amazon_datazone_api --conf spark.openlineage.transport.domainId=dzd_xxxxxxxx --conf spark.glue.accountId={account_id}"
              }
          }'

  9. After the job has executed successfully, navigate to the SageMaker Unified Studio domain.
  10. Choose Project and under Overview, choose Data Sources.
  11. Select the Data Catalog source ({account_id}-AwsDataCatalog-glue_db_xxxxxxxxxx-default-datasource).
  12. On the Actions dropdown menu, choose Edit.
  13. Under Connection, enable Import data lineage.
  14. In the Data Selection section, under Table Selection Criteria, provide a table name or use * to generate lineage.
  15. Update the data source and choose Run to create an asset called attendancewithempnew in SageMaker Catalog.
  16. Navigate to Assets, choose the attendancewithempnew asset, and navigate to the LINEAGE section.

The following lineage diagram shows an AWS Glue job that integrates employee information stored in Amazon RDS for MySQL and employee absence records stored in Amazon Redshift. The AWS Glue job combines these datasets through a join operation, then creates a table in the Data Catalog and registers it as an asset in SageMaker Catalog.

Clean up

To clean up your resources, complete the following steps:

  1. On the AWS Glue console, delete the AWS Glue job.
  2. On the Amazon EMR console, delete the EMR Serverless Spark application and EMR Studio.
  3. On the AWS CloudFormation console, delete the CloudFormation stack vpc-analytics-lineage-sus.

Conclusion

In this post, we showed how data lineage in SageMaker Catalog helps you track and understand the complete lifecycle of your data across various AWS analytics services. This comprehensive tracking system provides visibility into how data flows through different processing stages, transformations, and analytical workflows, making it an essential tool for data governance, compliance, and operational efficiency.

Try out these lineage visualization methods for your own use cases, and share your questions and feedback in the comments section.


About the Authors

Shubham Purwar

Shubham Purwar

Shubham is an AWS Analytics Specialist Solution Architect. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on the AWS platform. With deep expertise in AWS analytics services, he collaborates with customers to uncover their distinct business requirements and create customized solutions that deliver actionable insights and drive business growth. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar

Nitin Kumar

Nitin is a Cloud Engineer (ETL) at Amazon Web Services, specialized in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Prashanthi Chinthala

Prashanthi Chinthala

Prashanthi is a Cloud Engineer (DIST) at AWS. She helps customers overcome EMR challenges and develop scalable data processing and analytics pipelines on AWS.

Transform your data to Amazon S3 Tables with Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/transform-your-data-to-amazon-s3-tables-with-amazon-athena/

Organizations today manage vast amounts of data, with much of it stored based on initial use cases and business needs. As requirements for this data evolve—whether for real-time reporting, advanced machine learning (ML), or cross-team data sharing—the original storage formats and structures often become a bottleneck. When this happens, data teams frequently find that datasets that worked well for their original purpose now require complex transformations; custom extract, transform, and load (ETL) pipelines; and extensive redesign to unblock new analytical workflows. This creates a significant barrier between valuable data and actionable insights.

Amazon Athena offers a solution through its serverless, SQL-based approach to data transformation. With the CREATE TABLE AS SELECT (CTAS) functionality in Athena, you can transform existing data and create new tables in the process, using standard SQL statements to help reduce the need for custom ETL pipeline development.

This CTAS experience now supports Amazon S3 Tables, which provide built-in optimization, Apache Iceberg support, automatic table maintenance, and ACID transaction capabilities. This combination can help organizations modernize their data infrastructure, achieve improved performance, and reduce operational overhead.

You can use this approach to transform data from commonly used tabular formats, including CSV, TSV, JSON, Avro, Parquet, and ORC. The resulting tables are immediately accessible for querying across Athena, Amazon Redshift, Amazon EMR, and supported third-party applications, including Apache Spark, Trino, DuckDB, and PyIceberg.

This post demonstrates how Athena CTAS simplifies the data transformation process through a practical example: migrating an existing Parquet dataset into S3 Tables.

Solution overview

Consider a global apparel ecommerce retailer processing thousands of daily customer reviews across marketplaces. Their dataset, currently stored in Parquet format in Amazon Simple Storage Service (Amazon S3), requires updates whenever customers modify ratings and review content. The business needs a solution that supports ACID transactions—the ability to atomically insert, update, and delete records while maintaining data consistency—because review data changes frequently as customers edit their feedback.

Additionally, the data team faces operational challenges: manual table maintenance tasks like compaction and metadata management, no built-in support for time travel queries to analyze historical changes, and the need for custom processes to handle concurrent data modifications safely.

These requirements point to a need for an analytics-friendly solution that can handle transactional workloads while providing automated table maintenance, reducing the operational overhead that currently burdens their analysts and engineers.

S3 Tables and Athena provide an ideal solution for these requirements. S3 Tables provide storage optimized for analytics workloads, offering Iceberg support with automatic table maintenance and continuous optimization. Athena is a serverless, interactive query service you can use to analyze data using standard SQL without managing infrastructure. When combined, S3 Tables handle the storage optimization and maintenance automatically, and Athena provides the SQL interface for data transformation and querying. This can help reduce the operational overhead of manual table maintenance while providing efficient data management and optimal performance across supported data processing and query engines.

In the following sections, we show how to use the CTAS functionality in Athena to transform the Parquet-formatted review data into S3 Tables with a single SQL statement. We then demonstrate how to manage dynamic data using INSERT, UPDATE, and DELETE operations, showcasing the ACID transaction capabilities and metadata query features in S3 Tables.

Prerequisites

In this walkthrough, we will be working with synthetic customer review data that we’ve made publicly available at s3://aws-bigdata-blog/generated_synthetic_reviews/data/. To follow along, you must have the following prerequisites:

You will create an S3 table bucket named athena-ctas-s3table-demo as part of this walkthrough. Make sure this name is available in your chosen AWS Region.

Set up a database and tables in Athena

Let’s start by creating a database and source table to hold our Parquet data. This table will serve as the data source for our CTAS operation.

Navigate to the Athena query editor to run the following queries:

CREATE DATABASE IF NOT EXISTS `awsdatacatalog`.`reviewsdb`
CREATE EXTERNAL TABLE IF NOT EXISTS `awsdatacatalog`.`reviewsdb`.`customer_reviews`(
  `marketplace` string, 
  `customer_id` string, 
  `review_id` string, 
  `product_id` string, 
  `product_title` string, 
  `star_rating` bigint, 
  `helpful_votes` bigint, 
  `total_votes` bigint, 
  `insight` string, 
  `review_headline` string, 
  `review_body` string, 
  `review_date` timestamp, 
  `review_year` bigint)
PARTITIONED BY ( 
  `product_category` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://aws-bigdata-blog/generated_synthetic_reviews/data/'

Because the data is partitioned by product category, you must add the partition information to the table metadata using MSCK REPAIR TABLE:

MSCK REPAIR TABLE `awsdatacatalog`.`reviewsdb`.`customer_reviews`

The preview query should return sample review data, confirming the table is ready for transformation:

SELECT * FROM "awsdatacatalog"."reviewsdb"."customer_reviews" limit 10

Create a table bucket

Table buckets are designed to store tabular data and metadata as objects for analytics workloads. Follow these steps to create a table bucket:

  1. Sign in to the console in your preferred Region and open the Amazon S3 console.
  2. In the navigation pane, choose Table buckets.
  3. Choose Create table bucket.
  4. For Table bucket name, enter athena-ctas-s3table-demo.
  5. Select Enable integration for Integration with AWS analytics services if not already enabled.
  6. Leave the encryption option to default.
  7. Choose Create table bucket.

You can now see athena-ctas-s3table-demo listed under Table buckets.

Create a namespace

Namespaces provide logical organization for tables within your S3 table bucket, facilitating scalable table management. In this step, we create a reviews_namespace to organize our customer review tables. Follow these steps to create the table namespace:

  1. In the navigation pane under Table buckets, choose your newly created bucket athena-ctas-s3table-demo.
  2. On the bucket details page, choose Create table with Athena.
  3. Choose Create a namespace for Namespace configuration.
  4. Enter reviews_namespace for Namespace name.
  5. Choose Create namespace.
  6. Choose Create table with Athena to navigate to the Athena query editor.

You should now see your S3 Tables configuration automatically selected under Data, as shown in the following screenshot.

When you enable Integration with AWS analytics services, when creating an S3 table bucket, AWS Glue creates a new catalog called s3tablescatalog in your account’s default Data Catalog specific to your Region. The integration maps the S3 table bucket resources in your account and Region in this catalog.

This configuration makes sure subsequent queries will target your S3 Tables namespace. You’re now ready to create tables using the CTAS functionality.

Create a new S3 table using the customer_reviews table

A table represents a structured dataset consisting of underlying table data and related metadata stored in the Iceberg table format. In the following steps, we transform the customer_reviews table that we created earlier on the Parquet dataset into an S3 table using the Athena CTAS statement. We partition by date using the day() partition transforms from Iceberg.

Run the following CTAS query:

CREATE TABLE "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'day(review_date)' ]
) as
select *
from "awsdatacatalog"."reviewsdb"."customer_reviews"
where review_year >= 2016

This query creates as S3 table with the following optimizations:

  • Parquet format – Efficient columnar storage for analytics
  • Day-level partitioning – Uses Iceberg’s day() transform on review_date for fast queries when filtering on dates
  • Filtered data – Includes only reviews from 2016 onwards to demonstrate selective transformation

You have successfully transformed your Parquet dataset to S3 Tables using a single CTAS statement.

After you create the table, customer_reviews_s3table will appear under Tables in the Athena console. You can also view the table on the Amazon S3 console by choosing the options menu (three vertical dots) next to the table name and choosing View in S3.

Run a preview query to confirm the data transformation:

SELECT * FROM "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table" limit 10;

Next, let’s analyze monthly review trends:

SELECT review_year,
    month(review_date) as review_month,
    COUNT(*) as review_count,
    ROUND(AVG(star_rating), 2) as avg_rating
FROM "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
WHERE review_date >= DATE('2017-01-01')
    and review_date < DATE('2018-01-01')
GROUP BY 1,2
ORDER BY 1,2

The following screenshot shows our output.

ACID operations on S3 Tables

Athena supports standard SQL DML operations (INSERT, UPDATE, DELETE and MERGE INTO) on S3 Tables with full ACID transaction guarantees. Let’s demonstrate these capabilities by adding historical data and performing data quality checks.

Insert more data into the table using INSERT

Use the following query to insert review data from 2014 and 2015 that wasn’t included in the initial CTAS operation:

INSERT INTO "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
select *
from "awsdatacatalog"."reviewsdb"."customer_reviews"
where review_year IN (2014, 2015)

Check which years are now present in the table:

SELECT distinct(review_year)
from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
ORDER BY 1

The following screenshot shows our output.

The results show that you have successfully added 2014 and 2015 data. However, you might also notice some invalid years like 2101 and 2202, which appear to be data quality issues in the source dataset.

Clean invalid data using DELETE

Remove the records with incorrect years using the S3 Tables DELETE capability:

DELETE from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
WHERE review_year IN (2101, 2202)

Confirm the invalid records have been removed.

Update product categories using UPDATE

Let’s demonstrate the UPDATE operation with a business scenario. Imagine the company decides to rebrand the Movies_TV product category to Entertainment_Media to better reflect customer preferences.

First, examine the current product categories and their record counts:

select product_category,
    count(*) review_count
from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
group by 1
order by 1

You should see a record with product_category as Movies_TV with approximately 5,690,101 reviews. Use the following query to update all Movies_TV records to the new category name:

UPDATE "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
SET product_category = 'Entertainment_Media'
WHERE product_category = 'Movies_TV'

Verify the category name change while confirming the record count remains the same:

select product_category,
    count(*) review_count
from "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."customer_reviews_s3table"
group by 1
order by 1

The results now show Entertainment_Media with the same record count (5,690,101), confirming that the UPDATE operation successfully modified the category name while preserving data integrity.

These examples demonstrate transactional support in S3 Tables through Athena. Combined with automated table maintenance, this helps you build scalable, transactional data lakes more efficiently with minimal operational overhead.

Additional transformation scenarios using CTAS

The Athena CTAS functionality supports multiple transformation paths to S3 Tables. The following scenarios demonstrate how organizations can use this capability for various data modernization needs:

  • Convert from various data formats – Athena can query data in a wide range of formats as well as federated data sources, and you can convert these queryable sources to an S3 table using CTAS. For example, to create an S3 table from a federated data source, use the following query:
CREATE TABLE "s3tablescatalog/athena-ctas-s3table-demo"."reviews_namespace"."<s3table-name>" WITH (
    format = 'parquet'
) AS
SELECT *
FROM <federated-data-source>.<database>.<table>
  • Transform between S3 tables for optimized analytics – Organizations often need to create derived tables from existing S3 tables optimized for specific query patterns. For example, consider a table containing detailed customer reviews that’s partitioned by product category. If your analytics team frequently queries by date ranges, you can use CTAS to create a new S3 table partitioned by date for significantly better performance on time-based queries. For example, the following query creates an aggregated analytics S3 table:
CREATE TABLE "s3tablescatalog/destination-bucket"."namespace"."reviews_by_date" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'month(review_date)' ]
) AS
SELECT *
FROM "s3tablescatalog/source-bucket"."namespace"."reviews_by_category"
WHERE review_date >= DATE('2023-01-01')
  • Transform from self-managed open table formats – Organizations maintaining their own Iceberg tables can transform them into S3 tables to take advantage of automatic optimization and reduce operational overhead:
CREATE TABLE "s3tablescatalog/destination-bucket"."namespace"."managed_reviews" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'day(review_date)' ]
) AS
SELECT *
FROM "icebergdb"."self_managed_reviews_iceberg"
  • Combine multiple source tables – Organizations often need to consolidate data from multiple tables into a single table for simplified analytics. This approach can help reduce query complexity and improve performance by pre-joining related datasets. The following query joins multiple tables using CTAS to create an S3 table:
CREATE TABLE "s3tablescatalog/destination-bucket"."namespace"."enriched_reviews" WITH (
    format = 'parquet',
    partitioning = ARRAY [ 'day(review_date)' ]
) AS
SELECT 
    r.*,
    p.product_category,
    p.product_price,
    p.product_brand
FROM "catalog"."database"."reviews" r
JOIN "catalog"."database"."products" p
    ON r.product_id = p.product_id

These scenarios demonstrate the flexibility of Athena CTAS for various data modernization needs, from simple format conversions to complex data consolidation projects.

Clean up

To avoid ongoing charges, clean up the resources created during this walkthrough. Complete these steps in the specified order to facilitate proper resource deletion. You might need to add respective delete permissions for databases, table buckets, and tables if your IAM user or role doesn’t already have them.

  1. Delete the S3 table created through CTAS:
    DROP TABLE IF EXISTS `reviews_namespace`.`customer_reviews_s3table`

  2. Remove the namespace from the table bucket:
    DROP DATABASE `reviews_namespace`

  3. Delete the table bucket.
  4. Remove the database and table created for the synthetic dataset:
    DROP TABLE `reviewsdb`.`customer_reviews`

    DROP DATABASE `reviewsdb`

  5. Delete any created IAM roles or policies.
  6. Delete the Athena query result location in Amazon S3 if you stored results in an S3 location.

Conclusion

This post demonstrated how the CTAS functionality in Athena simplifies data transformation to S3 Tables using standard SQL statements. We covered the complete transformation process, including format conversions, ACID operations, and various data transformation scenarios. The solution delivers simplified data transformation through single SQL statements, automatic maintenance, and seamless integration of S3 Tables with AWS analytics services and third-party tools. Organizations can modernize their data infrastructure while achieving enterprise-grade performance.

To get started, begin by identifying datasets that could benefit from optimization or transformation, then refer to Working with Amazon S3 Tables and table buckets and Register S3 table bucket catalogs and query Tables from Athena to implement the transformation patterns demonstrated in this walkthrough. The combination of the serverless capabilities of Athena with the automatic optimizations in S3 Tables can provide a powerful foundation for modern data analytics.


About the authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS Analytics services.

Aritra Gupta is a Senior Technical Product Manager on the Amazon S3 team at Amazon Web Services. He helps customers build and scale data lakes. Based in Seattle, he likes to play chess and badminton in his spare time.

Build an analytics pipeline that is resilient to Avro schema changes using Amazon Athena

Post Syndicated from Mohammad Sabeel original https://aws.amazon.com/blogs/big-data/build-an-analytics-pipeline-that-is-resilient-to-avro-schema-changes-using-amazon-athena/

As technology progresses, the Internet of Things (IoT) expands to encompass more and more things. As a result, organizations collect vast amounts of data from diverse sensor devices monitoring everything from industrial equipment to smart buildings. These sensor devices frequently undergo firmware updates, software modifications, or configuration changes that introduce new monitoring capabilities or retire obsolete metrics. As a result, the data structure (schema) of the information transmitted by these devices evolves continuously.

Organizations commonly choose Apache Avro as their data serialization format for IoT data due to its compact binary format, built-in schema evolution support, and compatibility with big data processing frameworks. This becomes crucial when sensor manufacturers release updates that add new metrics or deprecate old ones, allowing for seamless data processing. For example, when a sensor manufacturer releases a firmware update that adds new temperature precision metrics or deprecates legacy vibration measurements, Avro’s schema evolution capabilities allow for seamless handling of these changes without breaking existing data processing pipelines.

However, managing schema evolution at scale presents significant challenges. For example, organizations need to store and process data from thousands of sensors and update their schemas independently, handle schema changes occurring as frequently as every hour due to rolling device updates, maintain historical data compatibility while accommodating new schema versions, query data across multiple time periods with different schemas for temporal analysis, and ensure minimal query failures due to schema mismatches.

To address this challenge, this post demonstrates how to build such a solution by combining Amazon Simple Storage Service (Amazon S3) for data storage, AWS Glue Data Catalog for schema management, and Amazon Athena for one-time querying. We’ll focus specifically on handling Avro-formatted data in partitioned S3 buckets, where schemas can change frequently while providing consistent query capabilities across all data regardless of schema versions.

This solution is specifically designed for Hive-based tables, such as those in the AWS Glue Data Catalog, and is not applicable for Iceberg tables. By implementing this approach, organizations can build a highly adaptive and resilient analytics pipeline capable of handling extremely frequent Avro schema changes in partitioned S3 environments.

Solution overview

In this post as an example, we’re simulating a real-world IoT data pipeline with the following requirements:

  • IoT devices continuously upload sensor data in Avro format to an S3 bucket, simulating real-time IoT data ingestion
  • The schema change happens frequently over time
  • Data will be partitioned hourly to reflect typical IoT data ingestion patterns
  • Data needs to be queryable using the most recent schema version through Amazon Athena.

To achieve these requirements, we demonstrate the solution using automated schema detection. We use AWS Command Line Interface (AWS CLI) and AWS SDK for Python (Boto3) scripts to simulate an automated mechanism that continually monitors the S3 bucket for new data, detects schema changes in incoming Avro files, and triggers necessary updates to the AWS Glue Data Catalog.

For schema evolution handling, our solution will demonstrate how to create and update table definitions in the AWS Glue Data Catalog, incorporate Avro schema literals to handle schema changes, and use the Athena partition projection for efficient querying across schema versions. The data steward or admin needs to know when and how the schema is updated so that the admin can manually change the columns in the UpdateTable API call. For validation and querying, we use Amazon Athena queries to verify table definitions and partition details and demonstrate successful querying of data across different schema versions. By simulating these components, our solution addresses the key requirements outlined in the introduction:

  • Handling frequent schema changes (as often as hourly)
  • Managing data from thousands of sensors updating independently
  • Maintaining historical data compatibility while accommodating new schemas
  • Enabling querying across multiple time periods with different schemas
  • Minimizing query failures due to schema mismatches

Although in a production environment this would be integrated into a sophisticated IoT data processing application, our simulation using AWS CLI and Boto3 scripts effectively demonstrates the principles and techniques for managing schema evolution in large-scale IoT deployments.

The following diagram illustrates the solution architecture.

Prerequisites:

To perform the solution, you need to have the following prerequisites:

Create the base table

In this section, we simulate the initial setup of a data pipeline for IoT sensor data. This step is crucial because it establishes the foundation for our schema evolution demonstration. This initial table serves as the starting point from which our schema will evolve. It allows us to demonstrate how to handle schema changes over time. In this scenario, the base table contains three key fields: customerID (bigint), sentiment (a struct containing customerrating), and dt (string) as a partition column. And Avro schema literal (‘avro.schema.literal’)along with other configurations. Follow these steps:

  1. Create a new file named `CreateTableAPI.py` with the following content. Replace 'Location': 's3://amzn-s3-demo-bucket/' with your S3 bucket details and <AWS Account ID> with your AWS account ID:
import boto3
import time

if __name__ == '__main__':
    database_name = " blogpostdatabase"
    table_name = "blogpost_table_test"
    catalog_id = ''
    client = boto3.client('glue')

    response = client.create_table(
        CatalogId=catalog_id,
        DatabaseName=database_name,
        TableInput={
            'Name': table_name,
            'Description': 'sampletable',
            'Owner': 'root',
            'TableType': 'EXTERNAL_TABLE',
            'LastAccessTime': int(time.time()),
            'LastAnalyzedTime': int(time.time()),
            'Retention': 0,
            'Parameters' : {
                'avro.schema.literal': '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 }] } ], "default" : 0 }]}'
            },
            'StorageDescriptor': {
                'Columns': [
                    {
                        'Name': 'customerID',
                        'Type': 'bigint',
                        'Comment': 'from deserializer'
                    },
                    {
                        'Name': 'sentiment',
                        'Type': 'struct<customerrating:bigint>',
                        'Comment': 'from deserializer'
                    }
                ],
                'Location': 's3:///',
                'InputFormat': 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat',
                'OutputFormat': 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat',
                'SerdeInfo': {
                    'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe',
                    'Parameters': {
                        'avro.schema.literal': '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 } ] } ], "default" : 0 }]}'
                    }
                }
            },
            'PartitionKeys': [
                {
                    'Name': 'dt',
                    'Type': 'string'
                }
            ]
        }
    )

    print(response)
  1. Run the script using the command:
python3 CreateTableAPI.py

The schema literal serves as a form of metadata, providing a clear description of your data structure. In Amazon Athena, Avro table schema Serializer/Deserializer (SerDe) properties are essential for schema is compatible with the data stored in files, facilitating accurate translation for query engines. These properties enable the precise interpretation of Avro-formatted data, allowing query engines to correctly read and process the information during execution.

The Avro schema literal provides a detailed description of the data structure at the partition level. It defines the fields, their data types, and any nested structures within the Avro data. Amazon Athena uses this schema to correctly interpret the Avro data stored in Amazon S3. It makes sure that each field in the Avro file is mapped to the correct column in the Athena table.

The schema information helps Athena optimize query run by understanding the data structure in advance. It can make informed decisions about how to process and retrieve data efficiently. When the Avro schema changes (for example, when new fields are added), updating the schema literal allows Athena to recognize and work with the new structure. This is crucial for maintaining query compatibility as your data evolves over time. The schema literal provides explicit type information, which is essential for Avro’s type system. This provides accurate data type conversion between Avro and Athena SQL types.

For complex Avro schemas with nested structures, the schema literal informs Athena how to navigate and query these nested elements. The Avro schema can specify default values for fields, which Athena can use when querying data where certain fields might be missing. Athena can use the schema to perform compatibility checks between the table definition and the actual data, helping to identify potential issues. In the SerDe properties, the schema literal tells the Avro SerDe how to deserialize the data when reading it from Amazon S3.

It’s crucial for the SerDe to correctly interpret the binary Avro format into a form Athena can query. The detailed schema information aids in query planning, allowing Athena to make informed decisions about how to execute queries efficiently. The Avro schema literal specified in the table’s SerDe properties provides Athena with the exact field mappings, data types, and physical structure of the Avro file. This enables Athena to perform column pruning by calculating precise byte offsets for required fields, reading only those specific portions of the Avro file from S3 rather than retrieving the entire record.

Parameters' : {
                'avro.schema.literal': '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 }] } ], "default" : 0 }]}'
            },
  1. After creating the table, verify its structure using the SHOW CREATE TABLE command in Athena:
CREATE EXTERNAL TABLE `blogpost_table_test`(
  `customerid` bigint COMMENT 'from deserializer', 
  `sentiment` struct<customerrating:bigint> COMMENT 'from deserializer')
PARTITIONED BY ( 
  `dt` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
WITH SERDEPROPERTIES ( 
  'avro.schema.literal'='{\"type\" : \"record\", \"name\" : \"customerdata\", \"namespace\" : \"com.data.test.avro\", \"fields\" : [{ \"name\" : \"customerID\", \"type\" : \"long\", \"default\" : -1 },{ \"name\" : \"sentiment\", \"type\" : [ \"null\", { \"type\" : \"record\", \"name\" : \"sentiment\", \"doc\" : \"***** CoreETL ******\", \"fields\" : [ { \"name\" : \"customerrating\", \"type\" : \"long\", \"default\" : 0 } ] } ], \"default\" : 0 }]}') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/'
TBLPROPERTIES (
  'avro.schema.literal'='{\"type\" : \"record\", \"name\" : \"customerdata\", \"namespace\" : \"com.data.test.avro\", \"fields\" : [{ \"name\" : \"customerID\", \"type\" : \"long\", \"default\" : -1 },{ \"name\" : \"sentiment\", \"type\" : [ \"null\", { \"type\" : \"record\", \"name\" : \"sentiment\", \"doc\" : \"***** CoreETL ******\", \"fields\" : [ { \"name\" : \"customerrating\", \"type\" : \"long\", \"default\" : 0 } ] } ], \"default\" : 0 }]}')

Note that the table is created with the initial schema as described below:

[
  {
    "Name": "customerid",
    "Type": "bigint",
    "Comment": "from deserializer"
  },
  {
    "Name": "sentiment",
    "Type": "struct<confirmedImpressions:bigint>",
    "Comment": "from deserializer"
  },
  {
    "Name": "dt",
    "Type": "string",
    "PartitionKey": "Partition (0)"
  }
]

With the table structure in place, you can load the first set of IoT sensor data and establish the initial partition. This step is crucial for setting up the data pipeline that will handle incoming sensor data.

  1. Download the example sensor data from the following S3 bucket
s3://aws-blogs-artifacts-public/artifacts/BDB-4745

Download initial schema from the first partition

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4745/dt=2024-03-21/initial_schema_sample1.avro 

Download second schema from the second partition

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4745/dt=2024-03-22/second_schema_sample2.avro

Download third schema from the third partition

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4745/dt=2024-03-23/third_scehama_sample3avro
  1. Upload the Avro-formatted sensor data to your partitioned S3 location. This represents your first day of sensor readings, organized in the date-based partition structure. Replace the bucket name amzn-s3-demo-bucket with your S3 bucket name and add a partitioned folder for the dt field.
s3://amzn-s3-demo-bucket/dt=2024-03-21/
  1. Register this partition in the AWS Glue Data Catalog to make it discoverable. This tells AWS Glue where to find your sensor data for this specific date:
ALTER TABLE  iot_sensor_data ADD PARTITION (dt='2024-03-21');
  1. Validate your sensor data ingestion by querying the newly loaded partition. This query helps verify that your sensor readings are correctly loaded and accessible:
SELECT * FROM "blogpostdatabase "."iot_sensor_data" WHERE dt='2024-03-21';

The following screenshot shows the query results.

This initial data load establishes the foundation for the IoT data pipeline, which means you can begin tracking sensor measurements while preparing for future schema evolution as sensor capabilities expand or change.

Now, we demonstrate how the IoT data pipeline handles evolving sensor capabilities by introducing a schema change in the second data batch. As sensors receive firmware updates or new monitoring features, their data structure needs to adapt accordingly. To show this evolution, we add data from sensors that now include visibility measurements:

  1. Examine the evolved schema structure that accommodates the new sensor capability:
{
  "fields": [
    {
      "Name": "customerid",
      "Type": "bigint",
      "Comment": "from deserializer"
    },
    {
      "Name": "sentiment",
      "Type": "struct<confirmedImpressions:bigint,visibility:bigint>",
      "Comment": "from deserializer"
    },
    {
      "Name": "dt",
      "Type": "string",
      "PartitionKey": "Partition (0)"
    }
  ]
}

Note the addition of the visibility field within the sentiment structure, representing the sensor’s enhanced monitoring capability.

  1. Upload this enhanced sensor data to a new date partition:
s3://amzn-s3-demo-bucket/dt=2024-03-22/
  1. Verify data consistency across both the original and enhanced sensor readings:
SELECT * FROM "blogpostdatabase"."blogpost_table_test" LIMIT 10;

This demonstrates how the pipeline can handle sensor upgrades while maintaining compatibility with historical data. In the next section, we explore how to update the table definition to properly manage this schema evolution, providing seamless querying across all sensor data regardless of when the sensors were upgraded. This approach is particularly valuable in IoT environments where sensor capabilities frequently evolve, which means you can maintain historical data while accommodating new monitoring features.

Update the AWS Glue table

To accommodate evolving sensor capabilities, you need to update the AWS Glue table schema. Although traditional methods such as MSCK REPAIR TABLE or ALTER TABLE ADD PARTITION work for small datasets for updating partition information, you can use an alternate method to handle tables with more than 100K partitions efficiently.

We use the Athena partition projection, which eliminates the need to process extensive partition metadata, which can be time-consuming for large datasets. Instead, it dynamically infers partition existence and location, allowing for more efficient data management. This method also speeds up query planning by quickly identifying relevant partitions, leading to faster query execution. Additionally, it reduces the number of API calls to the metadata store, potentially lowering costs associated with these operations. Perhaps most importantly, this solution maintains performance as the number of partitions grows, prodicing scalability for evolving datasets. These benefits combine to create a more efficient and cost-effective way of handling schema evolution in large-scale data environments.

To update your table schema to handle the new sensor data, follow these steps:

  1. Copy the following code into the UpdateTableAPI.py file:
import boto3

client = boto3.client('glue')

db = 'blogpostdatabase'
tb = 'blogpost_table_test'

response = client.get_table(
    DatabaseName=db,
    Name=tb
)

print(response)


table_input = {
    'Description': response['Table'].get('Description', ''),
    'Name': response['Table'].get('Name', ''),
    'Owner': response['Table'].get('Owner', ''),
    'Parameters': response['Table'].get('Parameters', {}),
    'PartitionKeys': response['Table'].get('PartitionKeys', []),
    'Retention': response['Table'].get('Retention'),
    'StorageDescriptor': response['Table'].get('StorageDescriptor', {}),
    'TableType': response['Table'].get('TableType', ''),
    'ViewExpandedText': response['Table'].get('ViewExpandedText', ''),
    'ViewOriginalText': response['Table'].get('ViewOriginalText', '')

}

for col in table_input['StorageDescriptor']['Columns']:
    if col['Name'] == 'sentiment':
        col['Type'] = 'struct<confirmedImpressions:bigint,visibility:bigint>'


table_input['StorageDescriptor']['SerdeInfo']['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0}] } ], "default" : 0 }]}'
table_input['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0} ] } ], "default" : 0 }]}'
table_input['Parameters']['projection.dt.type'] = 'date'
table_input['Parameters']['projection.dt.format'] = 'yyyy-MM-dd'
table_input['Parameters']['projection.enabled'] = 'true'
table_input['Parameters']['projection.dt.range'] = '2024-03-21,NOW'

response = client.update_table(
    DatabaseName=db,
    TableInput=table_input
)

This Python script demonstrates how to update an AWS Glue table to accommodate schema evolution and enable partition projection:

  1. It uses Boto3 to interact with AWS Glue API.
  2. Retrieves the current table definition from the AWS Glue Data Catalog.
  3. Updates the 'sentiment' column structure to include new fields.
  4. Modifies the Avro schema literal to reflect the updated structure.
  5. Adds partition projection parameters for the partition column dt
    table_input['Parameters']['projection.dt.type'] = 'date'
    table_input['Parameters']['projection.dt.format'] = 'yyyy-MM-dd'
    table_input['Parameters']['projection.enabled'] = 'true'
    table_input['Parameters']['projection.dt.range'] = '2024-03-21,NOW'

    1. Sets projection type to 'date'
    2. Defines date format as 'yyyy-MM-dd'
    3. Enables partition projection
    4. Sets date range from '2024-03-21' to 'NOW'
projection.date.type='date' --> Data type of the partition column
projection.date.format='yyyy-MM-dd' -> Data format of the partition column
projection.enabled='true' -> Enable the partition projection
projection.date.range='2024-04-26,NOW'. -> The range of the partition column
  1. Run the script using the following command:
python3 UpdateTableAPI.py

The script applies all changes back to the AWS Glue table using the UpdateTable API call. The following screenshot shows the table property with the new Avro schema literal and the partition projection.

After the table property is updated, you don’t need to add the partitions manually using the MSCK REPAIR TABLE or ALTER TABLE command. You can validate the result by running the query in the Athena console.

SELECT * FROM "blogpostdatabase"." blogpost_table_test " limit 10;

The following screenshot shows the query results.

This schema evolution strategy efficiently handles new data fields across different time periods. Consider the 'visibility' field introduced on 2024-03-22. For data from 2024-03-21, where this field doesn’t exist, the solution automatically returns a default value of 0. This approach makes the query consistent across all partitions, regardless of their schema version.

Here’s the Avro schema configuration that enables this flexibility:

{
  "type": "record",
  "name": "customerdata",
  "fields": [
    {"name": "customerID", "type": "long", "default": -1},
    {"name": "sentiment", "type": ["null", {
      "type": "record",
      "name": "sentiment",
      "fields": [
        {"name": "customerrating", "type": "long", "default": 0},
        {"name": "visibility", "type": "long", "default": 0}
      ]
    }], "default": null}
  ]
}

Using this configuration, you can run queries across all partitions without modifications, maintain backward compatibility without data migration, and support gradual schema evolution without breaking existing queries.

Building on the schema evolution example, we now introduce a third enhancement to the sensor data structure. This new iteration adds a text-based classification capability through a 'category' field (string type) to the sentiment structure. This represents a real-world scenario where sensors receive updates that add new classification capabilities, requiring the data pipeline to handle both numeric measurements and textual categorizations.

The following is the enhanced schema structure:

{
  "fields": [
    {
      "Name": "customerid",
      "Type": "bigint"
    },
    {
      "Name": "sentiment",
      "Type": "struct<confirmedImpressions:bigint,visibility:bigint,category:string>"
    },
    {
      "Name": "dt",
      "Type": "string"
    }
  ]
}

This evolution demonstrates how the solution flexibly accommodates different data types as sensor capabilities expand while maintaining compatibility with historical data.

To implement this latest schema evolution for the new partition (dt=2024-03-23), we update the table definition to include the ‘category’ field. Here’s the modified UpdateTableAPI.py script that handles this change:

  1. Update the file UpdateTableAPI.py:
import boto3

client = boto3.client('glue')

db = 'blogpostdatabase'
tb = 'blogpost_table_test'

response = client.get_table(
DatabaseName=db,
Name=tb
)

print(response)


table_input = {
'Description': response['Table'].get('Description', ''),
'Name': response['Table'].get('Name', ''),
'Owner': response['Table'].get('Owner', ''),
'Parameters': response['Table'].get('Parameters', {}),
'PartitionKeys': response['Table'].get('PartitionKeys', []),
'Retention': response['Table'].get('Retention'),
'StorageDescriptor': response['Table'].get('StorageDescriptor', {}),
'TableType': response['Table'].get('TableType', ''),
'ViewExpandedText': response['Table'].get('ViewExpandedText', ''),
'ViewOriginalText': response['Table'].get('ViewOriginalText', '')

}

for col in table_input['StorageDescriptor']['Columns']:
if col['Name'] == 'sentiment':
col['Type'] = 'struct<confirmedImpressions:bigint,visibility:bigint,category:string>'


table_input['StorageDescriptor']['SerdeInfo']['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0},{"name":"category","type":"string","default":"null"} ] } ], "default" : 0 }]}'
table_input['Parameters']['avro.schema.literal'] = '{"type" : "record", "name" : "customerdata", "namespace" : "com.data.test.avro", "fields" : [{ "name" : "customerID", "type" : "long", "default" : -1 },{ "name" : "sentiment", "type" : [ "null", { "type" : "record", "name" : "sentiment", "doc" : "***** CoreETL ******", "fields" : [ { "name" : "customerrating", "type" : "long", "default" : 0 },{"name":"visibility","type":"long","default":0},{"name":"category","type":"string","default":"null"} ] } ], "default" : 0 }]}'
table_input['Parameters']['projection.dt.type'] = 'date'
table_input['Parameters']['projection.dt.format'] = 'yyyy-MM-dd'
table_input['Parameters']['projection.enabled'] = 'true'
table_input['Parameters']['projection.dt.range'] = '2024-03-21,NOW'

response = client.update_table(
DatabaseName=db,
TableInput=table_input
)
  1. Verify the changes by running the following query:
SELECT * FROM "blogpostdatabase"."blogpost_table_test" LIMIT 10;

The following screenshot shows the query results.

There are three key changes in this update:

  1. Added 'category' field (string type) to the sentiment structure
  2. Set default value "null" for the category field
  3. Maintained existing partition projection settings

To support that latest sensor data enhancement, we updated the table definition to include a new text-based 'category' field in the sentiment structure. The modified UpdateTableAPI script adds this capability while maintaining the established schema evolution patterns. It achieves this by updating both the AWS Glue table schema and the Avro schema literal, setting a default value of "null" for the category field.

This provides backward compatibility. Older data (before 2024-03-23) shows "null" for the category field, and new data includes actual category values. The script maintains the partition projection settings, enabling efficient querying across all time periods.

You can verify this update by querying the table in Athena, which will now show the complete data structure, including numeric measurements (customerrating, visibility) and text categorization (category) across all partitions. This enhancement demonstrates how the solution can seamlessly incorporate different data types while preserving historical data integrity and query performance.

Cleanup

To avoid incurring future costs, delete your Amazon S3 data if you no longer need it.

Conclusion

By combining Avro’s schema evolution capabilities with the power of AWS Glue APIs, we’ve created a robust framework for managing diverse, evolving datasets. This approach not only simplifies data integration but also enhances the agility and effectiveness of your analytics pipeline, paving the way for more sophisticated predictive and prescriptive analytics.

This solution offers several key advantages. It’s flexible, adapting to changing data structures without disrupting existing analytics processes. It’s scalable, able to handle growing volumes of data and evolving schemas efficiently. You can automate it and reduce the manual overhead in schema management and updates. Finally, because it minimizes data movement and transformation costs, it’s cost-effective.

Related references


About the authors

Mohammad Sabeel Mohammad Sabeel is a Senior Cloud Support Engineer at Amazon Web Services (AWS) with over 14 years of experience in Information Technology (IT). As a member of the Technical Field Community (TFC) Analytics team, he is a Subject matter expert in Analytics services AWS Glue, Amazon Managed Workflows for Apache Airflow (MWAA), and Amazon Athena services. Sabeel provides expert guidance and technical support to enterprise and strategic customers, helping them optimize their data analytics solutions and overcome complex challenges. With deep subject matter expertise he enables organizations to build scalable, efficient, and cost-effective data processing pipelines.

Indira Balakrishnan Indira Balakrishnan is a Principal Solutions Architect in the Amazon Web Services (AWS) Analytics Specialist Solutions Architect (SA) Team. She helps customers build cloud-based Data and AI/ML solutions to address business challenges. With over 25 years of experience in Information Technology (IT), Indira actively contributes to the AWS Analytics Technical Field community, supporting customers across various Domains and Industries. Indira participates in Women in Engineering and Women at Amazon tech groups to encourage girls to pursue STEM path to enter careers in IT. She also volunteers in early career mentoring circles.

How Stifel built a modern data platform using AWS Glue and an event-driven domain architecture

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/how-stifel-built-a-modern-data-platform-using-aws-glue-and-an-event-driven-domain-architecture/

Stifel Financial Corp. is an American multinational independent investment bank and financial services company, founded in 1890 and headquartered in downtown St. Louis, Missouri. Stifel offers securities-related financial services in the United States and Europe through several wholly owned subsidiaries. Stifel provides both equity and fixed income research and is the largest provider of US equity research.

In this post, we show you how Stifel implemented a modern data platform using AWS services and open data standards, building an event-driven architecture for domain data products while centralizing the metadata to facilitate discovery and sharing of data products.

Stifel’s modern data platform use case

Stifel envisioned a data platform that delivers accurate, timely, and properly governed data, providing consistency throughout the organization whenever users access the information. This approach showed limitations as the data complexity increased, data volumes grew, and demand for quick, business-driven insights rose. These challenges are encountered by financial institutions worldwide, leading to a reassessment of traditional data management practices. Under the federated governance model, Stifel developed a modern data strategy based on the following objectives:

  • Managing ingestion and metadata
  • Creating source-aligned data products complying with Stifel business streams
  • Integrating source-aligned data products from other domains (Stifel business units)
  • Producing consumer-aligned data products for specific business purposes
  • Publishing data products to a centralized data catalog

Some of the Stifel challenges highlighted in the preceding list required building a data platform that can:

  • Boost agility by democratizing data, thus reducing time to market and enhancing the customer experience
  • Improve data quality and trust in the data
  • Standardize tools and eliminate the shadow information technology (IT) culture to increase scalability, reduce risk, and minimize operational inefficiencies

Following the federated governance model, Stifel has organized its domain structure to provide autonomy to various functional teams while preserving the core values of data mesh. The following diagram depicts a high-level architecture of the data mesh implementation at Stifel.

Each data domain has the flexibility to create data products that can be published to the centralized catalog, while maintaining the autonomy for teams to develop data products that are exclusively accessible to teams within the domain. These products aren’t available to others until they are deemed ready for broader enterprise use. Domains have the freedom to decide which data they want to share. They can either:

  • Make their data products visible to everyone through the central catalog
  • Keep their data products visible only within their own domain

By implementing an event-driven domain architecture, organizations can achieve significant business advantages while positioning themselves for future growth and innovation. Stifel data products refreshes were dependent on data assets with variable cadence. Event-driven architecture enables real-time or near real-time updates by allowing data products to automatically respond to changes in underlying data assets as they occur, rather than relying on fixed batch schedules that might miss critical updates or waste resources on unnecessary refreshes. The key is to carefully plan the implementation and make sure of alignment with business objectives while considering both technical and organizational factors. This architecture style particularly suits organizations that:

  • Need real-time processing capabilities
  • Have complex domain interactions
  • Require high scalability
  • Want to improve business agility
  • Need better system integration
  • Are pursuing digital transformation

The following are some of the key AWS Services that helped Stifel to build their modern data platform.

  • AWS Glue is a serverless data integration service that’s used for data processing to build data assets and data products in the domains. Data is also cataloged in AWS Glue Catalog, making it straightforward to discover and query with supported engines.
  • Amazon EventBridge provides a scalable and flexible serverless event bus that facilitates seamless communication between different domains and services. By using EventBridge, Stifel was able to implement a publish-subscribe model where domain events can be emitted, filtered, and routed to appropriate consumers based on configurable rules. EventBridge supports custom event buses for domain-specific events, enabling clear separation of concerns and improved manageability.
  • AWS Lake Formation helped in providing centralized security, governance, and catalog capabilities while preserving domain autonomy in data product creation and management. With Lake Formation, data domains were able to maintain their independent data products within a federated structure while enforcing consistent access controls, data quality standards, and metadata management across the organization.
  • Apache Hudi on Amazon Simple Storage Service (Amazon S3) offers an optimized way to store data assets and products and promotes interoperability across other services.

Stifel’s solution architecture

The following diagram illustrates the data mesh architecture that Stifel uses to build a domain-driven architecture. In this system, various domains create data products and share them with other domains through a central governance account that uses Lake Formation.

Let’s look at some of the key design components that are being used to enable and implement data mesh and event driven design

Data ingestion framework

The data ingestion framework consists of several processor modules that are built using several AWS services and metadata driven architecture. The following diagram shows the architecture of the raw data ingestion framework.

The framework gets raw data files from both internal Stifel systems and third-party data sources. These files are processed and stored in a raw data ingestion account on Amazon S3 in open table format Apache Hudi. This stored data is then shared with different parts of the organization, called data domains. Each domain can use this shared data to create their own data products.

As a file (in CSV, XML, JSON and custom formats) lands into the landing bucket, an Amazon S3 event notification is created and placed in an Amazon Simple Queue Service (Amazon SQS)queue. The Amazon SQS queue triggers an AWS Lambda function and saves the metadata (such as the name of the file, date and time the file was received, and the file size) to a file audit data store (Amazon Aurora PostgreSQL-Compatible Edition).

An EventBridge time scheduler invokes an AWS Step Functions workflow at pre-determined intervals. The Step Functions workflow orchestrates the batch ingestion from raw to staging layer.

  1. The Step Functions workflow orchestrates a set of Lambda functions to get the list of unprocessed raw files from the audit data store and create batches of raw files to process them in parallel. The Step Functions workflow then triggers parallel AWS Glue jobs that process each batch of raw files.
  2. Each raw file is validated for any data quality checks and the data is saved to staging tables in Hudi format. Any errors encountered are logged into an audit table and a notification is generated for support team. For all successfully processed raw files, the file status is updated to PROCESSED and logged into an audit table.
  3. After the Hudi table is updated, a data refresh event is sent to EventBridge and then passed to the Central Mesh Account. The Central Mesh Account forwards these events to the data domains to notify them that the raw tables are refreshed, allowing the data domains to use this data for creating their own data products.

Event driven data product refresh

The Stifel data lake is based on a data mesh architecture where several data producers share data across data domains. A mechanism is needed to alert consumers who depend on other data producers’ data products when those source data products are refreshed, so that the consumers can update their own data products accordingly. The following diagram describes the technical architecture of event-based data processing. The central governance account acts as the central event bus, which receives all data refresh events from all data producers. The central event bus forwards the events to consumer accounts. The consumer accounts filter the events consumers are interested in from data producers for their data processing needs.

Orchestration design

Stifel designed and implemented an event-based data pipeline orchestration system that triggers data pipelines when specific events occur. This system processes data immediately after receiving all required dependency events, enabling efficient workflow management.

The following diagram describes the logical architecture of the domain data pipeline orchestration framework.

The orchestration framework includes the components described in the following list. The data dependencies and data pipeline state management metadata are hosted in an Aurora PostgreSQL database.

  1. Data refresh processor: Receives data refresh events from central mesh and local data domain and evaluates if the domain data products data dependencies are met
  2. Data product dependency processor: Retrieves metadata for the product, kicks off a corresponding data domain AWS Glue job, and updates metadata with the job information
  3. Data pipeline state change processor: Monitors the domain data jobs and takes actions based on the job’s final status (SUCCEED or FAILED) and then creates incident tickets for failed jobs

Conclusion

Stifel has improved its data management and reduced data silos by adopting a data product approach. This strategy has positioned Stifel to become a data-driven, customer-centric organization. The company combines federated platform practices with AWS and open standards. As a result, Stifel is achieving its decentralization objectives through a scalable data platform. This platform empowers domain teams to make informed decisions, drive innovation, and maintain a competitive edge. Here are the some of the advantages Stifel got from an event-driven domain architecture (EDDA):

  • Business agility: Rapid market response, new business capability integration, scalable domains, quicker feature deployment, and flexible process modification
  • Customer experience: Real-time processing, responsive interactions, personalized services, consistent omnichannel presence, and enhanced service availability
  • Operational efficiency: Reduced system coupling, optimal resource use, scalable systems, lower maintenance overhead, and efficient data processing
  • Cost benefits: Lower development costs, reduced infrastructure expenses, decreased maintenance costs, efficient resource usage, and a better ROI on technology investments

In this post, we demonstrated how Stifel is building a modern data platform by recognizing the critical importance of data in today’s financial landscape. This strategic approach not only enhances operational efficiency but also positions Stifel at the forefront of technological innovation in the financial services industry. To learn more and get started, see the following resources:


About the authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Srinivas Kandi is a Senior Architect at Stifel focusing on delivering the next generation of cloud data platform on AWS. Prior to joining Stifel, Srini was a delivery specialist in cloud data analytics at AWS helping several customers in their transformational journey into AWS cloud. In his free time, Srini likes to explore cooking, travel and learn new trends and innovations in AI and cloud computing.

Hossein Johari is a seasoned data and analytics leader with over 25 years of experience architecting enterprise-scale platforms. As Lead and Senior Architect at Stifel Financial Corp. in St. Louis, Missouri, he spearheads initiatives in Data Platforms and Strategic Solutions, driving the design and implementation of innovative frameworks that support enterprise-wide analytics, strategic decision-making, and digital transformation. Known for aligning technical vision with business objectives, he works closely with cross-functional teams to deliver scalable, forward-looking solutions that advance organizational agility and performance.

Ahmad Rawashdeh is a Senior Architect at Stifel Financial. He supports Stifel and its clients in designing, implementing, and building scalable and reliable data architectures on Amazon Web Services (AWS), with a strong focus on data lake strategies, database services, and efficient data ingestion and transformation pipelines.

Lei Meng is a data architect at Stifel. His focus is working in designing and implementing scalable and secure data solutions on the AWS and helping Stifel’s cloud migration from on-premises systems.

AWS Weekly Roundup: New AWS Heroes, Amazon Q Developer, EC2 GPU price reduction, and more (June 9, 2025)

Post Syndicated from Betty Zheng (郑予彬) original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-new-aws-heroes-amazon-q-developer-ec2-gpu-price-reduction-and-more-june-9-2025/

The AWS Heroes program recognizes a vibrant, worldwide group of AWS experts whose enthusiasm for knowledge-sharing has a real impact within the community. Heroes go above and beyond to share knowledge in a variety of ways in developer community. We introduce our newest AWS Heroes in the second quarter of 2025.

To find and connect with more AWS Heroes near you, visit the categories in which they specialize Community Heroes, Container Heroes, Data Heroes, DevTools Heroes, Machine Learning Heroes, Security Heroes, and Serverless Heroes.

Last week’s launches
In addition to the inspiring celebrations, here are some AWS launches that caught my attention.

For a full list of AWS announcements, be sure to keep an eye on What’s New at AWS.

Other AWS news
Here are some additional projects, blog posts that you might find interesting:

  • Up to 45 percent price reduction for Amazon EC2 NVIDIA GPU-accelerated instances – AWS is reducing the price of NVIDIA GPU-accelerated Amazon EC2 instances (P4d, P4de, P5, and P5en) by up to 45 percent for On-Demand and Savings Plan usage. We are also making the very new P6-B200 instances available through Savings Plans to support large-scale deployments.
  • Introducing public AWS API models – AWS now provides daily updates of Smithy API models on GitHub, enabling developers to build custom SDK clients, understand AWS API behaviors, and create developer tools for better AWS service integration.
  • The AWS Asia Pacific (Taipei) Region is now open – The new Region provides customers with data residency requirements to securely store data in Taiwan while providing even lower latency. Customers across industries can benefit from the secure, scalable, and reliable cloud infrastructure to drive digital transformation and innovation.
  • Amazon EC2 has simplified the AMI cleanup workflow – Amazon EC2 now supports automatically deleting underlying Amazon Elastic Block Store (Amazon EBS) snapshots when deregistering Amazon Machine Images (AMIs).
  • The Lab where AWS designs custom chips – Visit Annapurna Labs in Austin, Texas—a combination of offices, workshops, and even a mini data center—where Amazon Web Services (AWS) engineers are designing the future of computing.

Upcoming AWS events
Check your calendars and sign up for these upcoming AWS events.

  • Join re:Inforce from anywhere – If you aren’t able to make it to Philadelphia (June 16–18), tune in remotely. Get free access to the re:Inforce keynote and innovation talks live as they happen.
  • AWS Summits – Join free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Register in your nearest city: Shanghai (June 19 – 20), Milano (June 18), Mumbai (June 19) and Japan (June 25 – 26).
  • AWS re:Invent – Mark your calendars for AWS re:Invent (December 1 – 5) in Las Vegas. Registration is now open
  • AWS Community Days – Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: Mexico (June 14), Nairobi, Kenya (June 14) and Colombia (June 28)

That’s all for this week. Check back next Monday for another Weekly Roundup!

Betty

Introducing managed query results for Amazon Athena

Post Syndicated from Guy Bachar original https://aws.amazon.com/blogs/big-data/introducing-managed-query-results-for-amazon-athena/

Amazon Athena makes it simple to analyze data without having to set up and manage data processing infrastructure. However, traditionally, you needed to set up an Amazon Simple Storage Service (Amazon S3) bucket to store query results before they could run queries with Athena. The need arose to make it even simpler to start using Athena, with fewer setup steps.

That’s why we’re thrilled to introduce managed query results, a new Athena feature that automatically stores, secures, and manages the lifecycle of query result data for you at no additional cost. Managed query results simplifies your user experience by removing the need to create or choose an S3 bucket in your account to hold results before you run queries. It helps reduce your monthly cost by shifting temporary storage of query results from your S3 bucket to Athena, and eliminates the need for separate processes to delete query result data from your S3 bucket after it’s no longer needed. Now, Athena offers both service managed, temporary result storage and customer managed Amazon S3 storage options to meet different needs.

What’s more, using managed query results doesn’t require complex changes to applications that read query results from existing Athena interfaces, and increases data security. Access to managed query result data is now associated with AWS Identity and Access Management (IAM) permissions scoped to individual Athena workgroups, instead of S3 buckets. Additionally, you can automatically encrypt result data with AWS Key Management Service (AWS KMS) using AWS owned or customer managed keys.

In this post, we demonstrate how to get started with managed query results and, by removing the undifferentiated effort spent on query result management, how Athena helps you get insights from your data in fewer steps than before.

Solution overview

When you use managed query results, you no longer need to create and choose S3 buckets to store query results, or manage lifecycle rules to make sure the result data is eventually cleaned up. The following are some scenarios where this is beneficial:

  • Financial analysts working in teams analyzing market data, each covering different investment areas or financial instruments, might use different workgroups for different kinds of analyses or projects. Now, analysts don’t need to spend time setting up S3 buckets or worry about cleaning up query results when their work is done.
  • Compliance teams can run audit queries on transaction data for regulatory reporting while making sure only authorized team members can access sensitive query results through IAM permissions. Because query results are cleaned up automatically, the compliance team no longer requires separate processes to delete query result data.
  • Data and analytics and platform automation teams who are responsible for streamlined onboarding of new users and teams no longer need to configure individual S3 buckets and permissions for different users and teams, simplifying their automation code.

The following are some of the key features of managed query results in Athena:

  • It removes the need to choose an S3 bucket location before you run queries.
  • There is no additional cost to store your query results, and query results are automatically deleted after a period of time, reducing management overhead from separate bucket cleanup processes.
  • It’s straightforward to get started: new and preexisting workgroups can be seamlessly configured to use managed query results. You can have a mix of Athena managed and customer managed query results in your AWS account.
  • You can use streamlined IAM permissions with access to read results using GetQueryResults and GetQueryResultsStream tied to individual workgroups.
  • Query results are automatically encrypted with your choice of AWS owned or customer managed KMS keys.

Let’s walk through how to get started with managed query results.

Configure your workgroup

Complete the following steps to configure your workgroup:

  1. On the Athena console, choose Workgroups in the navigation pane.
  2. Choose Create workgroup.

Alternatively, you can select an existing workgroup and choose Edit.

  1. For Query result configuration, select Athena managed.
  2. Navigate to the Athena console. To create a new workgroup, in the Workgroups page select the Create Workgroup button. To edit an existing workgroup, select a workgroup from the list and in the workgroup detail page, select the Edit button. Under Query result configuration section, you will see the option for Athena managed:
  3. For Encrypt query results, choose your preferred encryption method

Query result configuration

Figure 1: Query result configuration

Step 2: Configure Encryption

Choose your preferred encryption method for query results:

    1. Encrypt using an AWS owned key – This is the default option. It indicates that you want query results to be encrypted and decrypted by an AWS owned key.
    2. Encrypt using a customer managed key – Choose this option if you want to encrypt and decrypt query results with your own key. To have Athena use your customer managed key, specify the Athena service in the Principal elements of the key policy. For more information, see Setup an AWS KMS key policy for managed storage. To run queries, the user querying data needs permission to access your key.

Query your data

After you’ve configured your workgroup for managed query results, you can immediately start running queries. Let’s run a sample query against the AWS Cost and Usage Report.

The Athena console banner indicates that our workgroup, demo-workgroup, was updated to use managed query results. Our query ran successfully, and we didn’t need to set up an S3 bucket. To download these results, choose Download results CSV.

Running a query against the Cost and Usage report in the Athena console

Figure 2: Running a query against the Cost and Usage report in the Athena console

You can access these results through the Athena console and using the Athena APIs.

Accessing the query results via the Athena API

Figure 3: Accessing the query results via the Athena API

Conclusion

In this post, we introduced managed query results, a new Athena feature that streamlines the query experience through automated storage of query results, provides automatic cleanup, and limits query result access with IAM permissions. Managed query results reduces operational overhead, empowering both data analysts running interactive queries and teams building complex analytics pipelines to focus on deriving insights rather than managing infrastructure. We demonstrated how to configure workgroups for managed storage and effectively use this feature in query scenarios.

To start using managed query results with Athena, simply configure your workgroups through the Athena console or APIs. For more information, see Managed query results.


About the Authors

Guy Bachar is a Sr. Solutions Architect at AWS. He specializes in assisting capital markets and FinTech customers with their cloud transformation journeys. His expertise encompasses identity management, security, and unified communication.

Sayan Chakraborty is a Sr. Solutions Architect at AWS. He helps large enterprises build secure, scalable, and performant solutions on AWS. With a background in enterprise and technology architecture, he has experience delivering large-scale digital transformation programs across a wide range of industry verticals.

Darshit Thakkar is a Technical Product Manager at AWS and works out of Boston, Massachusetts. He works closely with customers to understand how they use data, and drives product innovations that make data more actionable at scale.

Build a secure serverless streaming pipeline with Amazon MSK Serverless, Amazon EMR Serverless and IAM

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/build-a-secure-serverless-streaming-pipeline-with-amazon-msk-serverless-amazon-emr-serverless-and-iam/

The exponential growth and vast volume of streaming data have made it a vital resource for organizations worldwide. To unlock its full potential, real-time analytics are essential for extracting actionable insights. Derived from a wide range of sources, including social media, Internet of Things (IoT) sensors, and user interactions, streaming data empowers businesses to respond promptly to emerging trends and events, make informed decisions, and stay ahead of the competition.

Commonly streaming applications use Apache Kafka for data ingestion and Apache Spark Structured Streaming for processing. However, integrating and securing these components poses considerable challenges for users. The complexity of managing certificates, keystores, and TLS configurations to connect Spark Streaming to Kafka brokers demands specialized expertise. A managed, serverless framework would greatly simplify this process, alleviating the need for manual configuration and streamlining the integration of these critical components.

To simplify the management and security of traditional streaming architectures, you can use Amazon Managed Streaming for Apache Kafka (Amazon MSK). This fully managed service simplifies data ingestion and processing. Amazon MSK Serverless alleviates the need for cluster management and scaling, and further enhances security by integrating AWS Identity and Access Management (IAM) for authentication and authorization. This consolidated approach replaces complex certificate and key management require by TLS client authentication through AWS Certificate Manager, streamlining operations and bolstering data protection. For instance, when a client attempts to write data to the cluster, MSK Serverless verifies both the client’s identity and its permissions using IAM.

For efficient data processing, you can use Amazon EMR Serverless with a Spark application built on the Spark Structured Streaming framework, enabling near real-time data processing. This setup seamlessly handles large volumes of data from MSK Serverless, using IAM authentication for secure and swift data processing.

The post demonstrates a comprehensive, end-to-end solution for processing data from MSK Serverless using an EMR Serverless Spark Streaming job, secured with IAM authentication. Additionally, it demonstrates how to query the processed data using Amazon Athena, providing a seamless and integrated workflow for data processing and analysis. This solution enables near real-time querying of the latest data processed from MSK Serverless and EMR Serverless using Athena, providing instant insights and analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this post.

The workflow consists of the following steps:

  1. The architecture begins with an MSK Serverless cluster set up with IAM authentication. An Amazon Elastic Compute Cloud (Amazon EC2) instance runs a Python script producer.py that acts as a data producer, sending sample data to a Kafka topic within the cluster.
  2. The Spark Streaming job retrieves data from the Kafka topic, stores it in Amazon Simple Storage Service (Amazon S3), and creates a corresponding table in the AWS Glue Data Catalog. As it continuously consumes data from the Kafka topic, the job stays up-to-date with the latest streaming data. With checkpointing enabled, the job tracks processed records, allowing it to resume from where it left off in case of a failure, providing seamless data processing.
  3. To analyze this data, users can use Athena, a serverless query service. Athena enables interactive SQL-based exploration of data directly in Amazon S3 without the need for complex infrastructure management.

Prerequisites

Before getting started, make sure you have the following:

  • An active AWS account with billing enabled
  • An IAM user with administrator access (AdministratorAccess policy) or specific permissions to create and manage resources such as a virtual private cloud (VPC), subnet, security group, IAM roles, NAT gateway, internet gateway, EC2 client, MSK Serverless, EMR Serverless, Amazon EMR Studio, and S3 buckets
  • Sufficient VPC capacity in your chosen AWS Region

Although using an IAM user with administrator access will work, it’s recommended to follow the principle of least privilege in production environments by creating custom IAM policies with only the necessary permissions. The IAM user we create has the AdministrativeAccess policy attached to it. However, you might not need such elevated access.

For this post, we create the solution resources in the us-east-2 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Create MSK Serverless and EMR Serverless resources

The vpc-msk-emr-serverless-studio.yaml stack creates a VPC, subnet, security group, IAM roles, NAT gateway, internet gateway, EC2 client, MSK Serverless, EMR Serverless, EMR Studio, and S3 buckets. To create the solution resources, complete the following steps:

  1. Launch the stack vpc-msk-emr-serverless-studio using the CloudFormation template:

  1. Provide the parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName An environment name that is prefixed to resource names. msk-emr-serverless-pipeline
InstanceType Amazon MSK client EC2 instance type. t2.micro
LatestAmiId Latest AMI ID of Amazon Linux 2023 for ec2 instance. You can use the default value. /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-6.1-x86_64
VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24

The stack creation process can take approximately 10 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

Next, you set up the data ingestion to the Kafka topic from the Kafka EC2 instance.

Produce records to Kafka topic

Complete the following steps to set up data ingestion:

  1. On the Amazon EC2 console, go to the EC2 instance that you created using the CloudFormation template.

  1. Log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.
  2. Choose the instance msk-emr-serverless-blog and then choose Connect.

  1. Create a Kafka topic in MSK Serverless from the EC2 instance.
    1. In the following export command, replace my-endpoint with the MSKBootstrapServers value from the CloudFormation stack output:
      $ sudo su - ec2-user
      $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-2.amazonaws.com:9098>

    2. Run the following command on the EC2 instance to create a topic called sales_data_topic:

Kafka client already installed at ec2-user home directory (/home/ec2-user) with MSK IAM Authentication jar and client configuration also created (/home/ec2-user/kafka_2.12-2.8.1/bin/client.properties) with IAM authentication properties.

The following code shows the contents of client.properties:

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

/home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
--bootstrap-server $BS \
--command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--create --topic sales_data_topic \
--partitions 10

Created topic sales_data_topic.
  1. Run the following command to produce records to the Kafka topic using the syntheticSalesDataProducer.py Python script present in EC2 instance. Update the Region accordingly.
nohup python3 -u syntheticSalesDataProducer.py --num_records 1000 \
--sales_data_topic sales_data_topic --bootstrap_server $BS \
--region=us-east-2 > syntheticSalesDataProducer.log &

Understanding Amazon MSK IAM authentication with EMR Serverless

Amazon MSK IAM authentication enables secure authentication and authorization for Kafka clusters (MSK Serverless) using IAM roles. When integrating with EMR Serverless Spark Streaming, Amazon MSK IAM authentication allows Spark jobs to access Kafka topics securely, using IAM roles for fine-grained access control. This provides secure data processing and streaming.

IAM policy configuration

To enable EMR Serverless jobs to authenticate with an MSK Serverless cluster using IAM, you need to attach specific Kafka-related IAM permissions to the EMR Serverless job execution role. These permissions allow the job to perform essential operations on the Kafka cluster, topics, and consumer groups.The following IAM policy must be attached to the EMR Serverless job execution role to enable necessary permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:cluster/<SERVERLESS_CLUSTER_NAME>/<ID>"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:topic/<SERVERLESS_CLUSTER_NAME>/*/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:group/<SERVERLESS_CLUSTER_NAME>/*/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This code refers to the following actions:

  • Connect, DescribeCluster – Required to initiate a secure connection and obtain metadata
  • DescribeTopic, ReadData, WriteData – Enables data consumption and production
  • CreateTopic (optional) – Allows dynamic topic creation
  • AlterGroup, DescribeGroup – Needed for consumer group management in streaming jobs

These permissions make sure that the Spark Streaming job can securely authenticate and interact with MSK Serverless resources using its IAM role.

Required dependencies

To enable Amazon MSK IAM authentication in Spark (especially on EMR Serverless), specific JAR dependencies must be included in your Spark Streaming job using sparkSubmitParameters:

  • spark-sql-kafka-0-10_2.12 – This is the Kafka connector for Spark Structured Streaming. It provides the DataFrame API to read from and write to Kafka.
  • aws-msk-iam-auth – This JAR provides the IAM authentication mechanism required to connect to MSK Serverless using the AWS_MSK_IAM SASL mechanism.

You can include these dependencies directly by specifying them in the --packages argument when submitting the EMR Serverless job. For example:

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0

When the job is submitted, EMR Serverless will automatically download these JARs from Maven Central (or another configured repository) at runtime. You don’t need to bundle them manually unless offline usage or specific versions are required.

Spark Streaming job configuration for Amazon MSK IAM authentication

In your Spark Streaming application, configure the Kafka source with SASL properties to enable IAM based authentication. The following code shows the relevant configuration:

topic_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", topic_input)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol","SASL_SSL")
    option("kafka.sasl.mechanism","AWS_MSK_IAM")
    .option("kafka.sasl.jaas.config","software.amazon.msk.auth.iam.IAMLoginModule required;")
    .option("kafka.sasl.client.callback.handler.class","software.amazon.msk.auth.iam.IAMClientCallbackHandler")
    .load()
    .selectExpr("CAST(value AS STRING)")
    )

Key properties include:

  • kafka.security.protocol = SASL_SSL – Enables encrypted communication over SSL with SASL authentication
  • kafka.sasl.mechanism = AWS_MSK_IAM – Tells Kafka to use the IAM based SASL mechanism
  • kafka.sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required; – Specifies the login module provided by AWS for IAM integration
  • kafka.sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler – Handles the actual signing and authentication using the IAM role

With these settings, Spark uses the IAM credentials attached to the EMR Serverless job execution role to authenticate to MSK Serverless without needing additional credentials, certificates, or secrets.

Data processing using an EMR Serverless streaming job with Amazon MSK IAM authentication

Complete the following steps to run a Spark Streaming job to process the data from MSK Serverless:

  1. Submit the Spark Streaming job to EMR Serverless using the AWS Command Line Interface (AWS CLI), which is already installed on the EC2 instance.
  2. Log in to the EC2 instance using Session Manager. Choose the instance msk-emr-serverless-blog and then choose Connect.
  3. Run the following command to submit the streaming job. Provide the parameters from the CloudFormation stack output.
sudo su - ec2-user

aws emr-serverless start-job-run \
--application-id <APPLICATION ID> \
--execution-role-arn <EXECUTION ROLE ARN> \
--mode 'STREAMING' \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<EMR BLOG SCRIPT BUCKET>/emr_pyspark_streaming_script/pysparkStreamingBlog.py",
"entryPointArguments":["--topic_input","sales_data_topic","--kafka_bootstrap_servers","<BOOTSTRAP URL WITH PORT>","--output_s3_path","s3://<EMR STREAMING OUTPUT BUCKET>/output/sales-order-data/","--checkpointLocation","s3://<EMR STREAMING OUTPUT BUCKET>/checkpointing/checkpoint-sales-order-data/","--database_name","emrblog","--table_name","sales_order_data"],
"sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.executor.cores=2 --conf spark.executor.memory=5g --conf spark.driver.cores=2 --conf spark.driver.memory=5g --conf spark.executor.instances=5 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0"
}}'
  1. After you submit the job, log in to EMR Studio using the URL in the EmrServerlessStudioURL value from the CloudFormation stack output.
  2. In the navigation pane, choose Applications under Serverless.
  3. Choose the application ID in the EmrServerlessSparkApplicationID value from the CloudFormation stack output.
  4. On the Streaming job runs tab, verify that the job has been submitted and wait for it to begin running.

Validate the data in Athena

After the EMR Serverless Spark Streaming job ran and created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, open the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database emrblog that the streaming job created.
  4. To validate the data, run the following query:
SELECT 
    DATE_TRUNC('minute', date) AS minute_window, 
    ROUND(SUM(total_amount), 2) AS total_amount
FROM 
    emrblog.sales_order_data
WHERE 
    DATE_TRUNC('day', date) = CURRENT_DATE
GROUP BY 
    DATE_TRUNC('minute', date)
ORDER BY 
    minute_window DESC;

Clean up

To clean up your resources, complete the following steps:

  1. Log in to EMR Studio using the URL from the EmrServerlessStudioURL value in the CloudFormation stack output.
  2. In the navigation pane, choose Applications under Serverless.
  3. Choose the application ID from the EmrServerlessSparkApplicationID value in the CloudFormation stack output.
  4. On the Streaming job runs tab, select the job that has been running and cancel the job run.
  5. On the AWS CloudFormation console, delete the CloudFormation stack vpc-msk-emr-serverless-studio.

Conclusion

In this post, we showcased a serverless pipeline for streaming data with IAM authentication, empowering you to focus on deriving insights from your analytics. You can customize the EMR Serverless Spark Streaming code to apply transformations and filters, so only valid data is loaded into Amazon S3. This solution combines the power of Amazon EMR Spark Serverless streaming with MSK Serverless, securely integrated through IAM authentication. Now you can streamline your streaming processes without the complexity of managing Amazon MSK and Amazon EMR Spark Streaming integrations.


About the Authors

Shubham Purwar is an AWS Analytics Specialist Solution Architect. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on the AWS platform. With deep expertise in AWS analytics services, he collaborates with customers to uncover their distinct business requirements and create customized solutions that deliver actionable insights and drive business growth. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specialized in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Prashanthi Chinthala is a Cloud Engineer (DIST) at AWS. She helps customers overcome EMR challenges and develop scalable data processing and analytics pipelines on AWS.

Configure cross-account access of Amazon SageMaker Lakehouse multi-catalog tables using AWS Glue 5.0 Spark

Post Syndicated from Aarthi Srinivasan original https://aws.amazon.com/blogs/big-data/configure-cross-account-access-of-amazon-sagemaker-lakehouse-multi-catalog-tables-using-aws-glue-5-0-spark/

Many organizations build and operate enterprise-wide data mesh architectures using the AWS Glue Data Catalog and AWS Lake Formation for their Amazon Simple Storage Service (Amazon S3) based data lakes. Now, with Amazon SageMaker Lakehouse, these organizations can unify their data analytics and AI/ML workflows while maintaining secure cross-account access without data replication. By centralizing access to a single copy of data and using the secure fine-grained permissions of Lake Formation, enterprises can accelerate their analytics initiatives while reducing operational complexity across business units.

SageMaker Lakehouse organizes data using logical containers called catalogs, enabling teams to seamlessly query and analyze data across their entire ecosystem—from S3 data lakes to Amazon Redshift warehouses—using familiar Apache Iceberg compatible tools. Organizations can either mount their existing data warehouse to the lakehouse or create new catalogs using Amazon Redshift managed storage. Built-in zero-ETL connectors reduce data silos by integrating various data sources, enabling unified analytics across teams. This seamless integration particularly benefits existing AWS customers who already use the Data Catalog and Lake Formation, because they can immediately take advantage of SageMaker Lakehouse capabilities.

AWS Glue is a serverless service that makes data integration simpler, faster, and cheaper. We launched AWS Glue 5.0 with upgraded Apache Spark 3.5.4 and Python 3.11. AWS Glue 5.0 adds support for SageMaker Lakehouse to unify your data across S3 data lakes and Redshift data warehouses.

In our previous blog post, we demonstrated the process of creating tables in both the Amazon Redshift managed catalog and Amazon Redshift federated catalog within a single AWS account. In this post, we show you how to share a Redshift table and Amazon S3 based Iceberg table from the account that owns the data to another account that consumes the data. In the recipient account, we run a join query on the shared data lake and data warehouse tables using Spark in AWS Glue 5.0. We walk you through the complete cross-account setup and provide the Spark configuration in a Python notebook.

Solution overview

To demonstrate the functionality of SageMaker Lakehouse multi-catalog tables using AWS Glue 5.0 Spark, let’s assume the retail company Example Retail Corp launches a campaign to understand their market and drive growth by country of operation. Their infrastructure consists of a Redshift data warehouse for structured data and an S3 data lake for structured and semi-structured data. The marketing team realizes that customer data is spread across those two systems and wants to use the support of their data engineering and analysts to analyze and provide insights. As a company, they prefer unified governance for managing data access while enabling a secure sharing mechanism for business and engineering teams.

Let’s see how they can achieve the goal using SageMaker Lakehouse. The solution is represented in the following diagram.

001-BDB 5089

The setup could be extended to enterprise data meshes where a data producer account will own the Redshift clusters, catalog the tables in a central governance account, and share with any number of consumer accounts from the central account. Multiple consumer accounts could analyze the shared Redshift tables using the SageMaker Lakehouse integrated analytics engines.

The solution also works for cross-Region table access. You would create a resource link for the catalog tables in an AWS Region where you want to run your analyses and create dashboards. For cross-Region resource link setup, refer to Setting up cross-Region table access.

Prerequisites

To implement this solution, you need the following prerequisites:

  • Two AWS accounts with Lake Formation cross-account sharing version 4 and Lake Formation administrator configured. Refer to the Lake Formation data administrator permissions and initial setup of Lake Formation.
  • Permissions from Prerequisites for managing Amazon Redshift namespaces in the AWS Glue Data Catalog granted to the Lake Formation administrator role on both accounts.
  • An S3 bucket in the producer account to host the sample Iceberg table data.
  • An AWS Identity and Access Management (IAM) role, LakeFormationS3Registration_custom, in the producer account to register your Iceberg table’s Amazon S3 location with Lake Formation. For details, refer to Registering an Amazon S3 location and Requirements for roles used to register locations.
  • An Amazon Redshift Serverless namespace in the producer account. Follow the instructions in Creating a data warehouse with Amazon Redshift Serverless to launch a serverless namespace with default settings.
  • Two sample datasets, orders and returns, in CSV format. This is Example Retail Corp’s data on their customer purchase and return trends. Their marketing team has collected these data in a Redshift table and Amazon S3 from various systems. The instructions to create these tables are provided in the appendix at the end of this post. After completing the steps in the appendix, you should have customerdb.returnstbl_iceberg in your default catalog and ordersdb.orderstbl in your Redshift Serverless application default namespace.
  • An IAM role, Glue-execution-role, in the consumer account, with the following policies:
    1. AWS managed policies AWSGlueServiceRole and AmazonRedshiftDataFullAccess.
    2. Create a new in-line policy with the following permissions and attach it:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "LFandRSserverlessAccess",
                  "Effect": "Allow",
                  "Action": [
                      "lakeformation:GetDataAccess",
                      "redshift-serverless:GetCredentials"
                  ],
                  "Resource": "*"
              },
              {
                  "Effect": "Allow",
                  "Action": "iam:PassRole",
                  "Resource": "*",
                  "Condition": {
                      "StringEquals": {
                          "iam:PassedToService": "glue.amazonaws.com"
                      }
                  }
              }
          ]
      }

    3. Add the following trust policy to Glue-execution-role, allowing AWS Glue to assume this role:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "glue.amazonaws.com"
                      ]
                  },
                  "Action": "sts:AssumeRole"
              }
          ]
      }

    Steps for producer account setup

    For the producer account setup, you can either use your IAM administrator role added as Lake Formation administrator or use a Lake Formation administrator role with permissions added as discussed in the prerequisites. For illustration purposes, we use the IAM admin role Admin added as Lake Formation administrator.

    002-BDB 5089

    Configure your catalog

    Complete the following steps to set up your catalog:

    1. Log in to AWS Management Console as Admin.
    2. On the Amazon Redshift console, follow the instructions in Registering Amazon Redshift clusters and namespaces to the AWS Glue Data Catalog.
    3. After the registration is initiated, you will see the invite from Amazon Redshift on the Lake Formation console.
    4. Select the pending catalog invitation and choose Approve and create catalog.

    003-BDB 5089

    1. On the Set catalog details page, configure your catalog:
      1. For Name, enter a name (for this post, redshiftserverless1-uswest2).
      2. Select Access this catalog from Apache Iceberg compatible engines.
      3. Choose the IAM role you created for the data transfer.
      4. Choose Next.

      004-BDB 5089

    2. On the Grant permissions – optional page, choose Add permissions.
      1. Grant the Admin user Super user permissions for Catalog permissions and Grantable permissions.
      2. Choose Add.

      005-BDB 5089

    3. Verify the granted permission on the next page and choose Next.
      006-BDB 5089
    4. Review the details on the Review and create page and choose Create catalog.
      007-BDB 5089

    Wait a few seconds for the catalog to show up.

    1. Choose Catalogs in the navigation pane and verify that the redshiftserverless1-uswest2 catalog is created.
      008-BDB 5089
    2. Explore the catalog detail page to verify the ordersdb.public database.
      009-BDB 5089
    3. On the database View dropdown menu, view the table and verify that the orderstbl table shows up.
      010-BDB 5089

    As the Admin role, you can also query the orderstbl in Amazon Athena and confirm the data is available.

    011-BDB 5089

    Grant permissions on the tables from the producer account to the consumer account

    In this step, we share the Amazon Redshift federated catalog database redshiftserverless1-uswest2:ordersdb.public and table orderstbl as well as the Amazon S3 based Iceberg table returnstbl_iceberg and its database customerdb from the default catalog to the consumer account. We can’t share the entire catalog to external accounts as a catalog-level permission; we just share the database and table.

    1. On the Lake Formation console, choose Data permissions in the navigation pane.
    2. Choose Grant.
      012-BDB 5089
    3. Under Principals, select External accounts.
    4. Provide the consumer account ID.
    5. Under LF-Tags or catalog resources, select Named Data Catalog resources.
    6. For Catalogs, choose the account ID that represents the default catalog.
    7. For Databases, choose customerdb.
      013-BDB 5089
    8. Under Database permissions, select Describe under Database permissions and Grantable permissions.
    9. Choose Grant.
      014-BDB 5089
    10. Repeat these steps and grant table-level Select and Describe permissions on returnstbl_iceberg.
    11. Repeat these steps again to grant database- and table-level permissions for the ordertbl table of the federated catalog database redshiftserverless1-uswest2/ordersdb.

    The following screenshots show the configuration for database-level permissions.

    015-BDB 5089

    016-BDB 5089

    The following screenshots show the configuration for table-level permissions.

    017-BDB 5089

    018-BDB 5089

    1. Choose Data permissions in the navigation pane and verify that the consumer account has been granted database- and table-level permissions for both orderstbl from the federated catalog and returnstbl_iceberg from the default catalog.
      019-BDB 5089

    Register the Amazon S3 location of the returnstbl_iceberg with Lake Formation.

    In this step, we register the Amazon S3 based Iceberg table returnstbl_iceberg data location with Lake Formation to be managed by Lake Formation permissions. Complete the following steps:

    1. On the Lake Formation console, choose Data lake locations in the navigation pane.
    2. Choose Register location.
      020-BDB 5089
    3. For Amazon S3 path, enter the path for your S3 bucket that you provided while creating the Iceberg table returnstbl_iceberg.
    4. For IAM role, provide the user-defined role LakeFormationS3Registration_custom that you created as a prerequisite.
    5. For Permission mode, select Lake Formation.
    6. Choose Register location.
      021-BDB 5089
    7. Choose Data lake locations in the navigation pane to verify the Amazon S3 registration.
      022-BDB 5089

    With this step, the producer account setup is complete.

    Steps for consumer account setup

    For the consumer account setup, we use the IAM admin role Admin, added as a Lake Formation administrator.

    The steps in the consumer account are quite involved. In the consumer account, a Lake Formation administrator will accept the AWS Resource Access Manager (AWS RAM) shares and create the required resource links that point to the shared catalog, database, and tables. The Lake Formation admin verifies that the shared resources are accessible by running test queries in Athena. The admin further grants permissions to the role Glue-execution-role on the resource links, database, and tables. The admin then runs a join query in AWS Glue 5.0 Spark using Glue-execution-role.

    Accept and verify the shared resources

    Lake Formation uses AWS RAM shares to enable cross-account sharing with Data Catalog resource policies in the AWS RAM policies. To view and verify the shared resources from producer account, complete the following steps:

    1. Log in to the consumer AWS console and set the AWS Region to match the producer’s shared resource Region. For this post, we use us-west-2.
    2. Open the Lake Formation console. You will see a message indicating there is a pending invite and asking you accept it on the AWS RAM console.
      023-BDB 5089
    3. Follow the instructions in Accepting a resource share invitation from AWS RAM to review and accept the pending invites.
    4. When the invite status changes to Accepted, choose Shared resources under Shared with me in the navigation pane.
    5. Verify that the Redshift Serverless federated catalog redshiftserverless1-uswest2, the default catalog database customerdb, the table returnstbl_iceberg, and the producer account ID under Owner ID column display correctly.
      024-BDB 5089
    6. On the Lake Formation console, under Data Catalog in the navigation pane, choose Databases.
    7. Search by the producer account ID.
      You should see the customerdb and public databases. You can further select each database and choose View tables on the Actions dropdown menu and verify the table names

    025-BDB 5089

    You will not see an AWS RAM share invite for the catalog level on the Lake Formation console, because catalog-level sharing isn’t possible. You can review the shared federated catalog and Amazon Redshift managed catalog names on the AWS RAM console, or using the AWS Command Line Interface (AWS CLI) or SDK.

    Create a catalog link container and resource links

    A catalog link container is a Data Catalog object that references a local or cross-account federated database-level catalog from other AWS accounts. For more details, refer to Accessing a shared federated catalog. Catalog link containers are essentially Lake Formation resource links at the catalog level that reference or point to a Redshift cluster federated catalog or Amazon Redshift managed catalog object from other accounts.

    In the following steps, we create a catalog link container that points to the producer shared federated catalog redshiftserverless1-uswest2. Inside the catalog link container, we create a database. Inside the database, we create a resource link for the table that points to the shared federated catalog table <<producer account id>>:redshiftserverless1-uswest2/ordersdb.public.orderstbl.

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Catalogs.
    2. Choose Create catalog.

    026-BDB 5089

    1. Provide the following details for the catalog:
      1. For Name, enter a name for the catalog (for this post, rl_link_container_ordersdb).
      2. For Type, choose Catalog Link container.
      3. For Source, choose Redshift.
      4. For Target Redshift Catalog, enter the Amazon Resource Name (ARN) of the producer federated catalog (arn:aws:glue:us-west-2:<<producer account id>>:catalog/redshiftserverless1-uswest2/ordersdb).
      5. Under Access from engines, select Access this catalog from Apache Iceberg compatible engines.
      6. For IAM role, provide the Redshift-S3 data transfer role that you had created in the prerequisites.
      7. Choose Next.

    027-BDB 5089

    1. On the Grant permissions – optional page, choose Add permissions.
      1. Grant the Admin user Super user permissions for Catalog permissions and Grantable permissions.
      2. Choose Add and then choose Next.

    028-BDB 5089

    1. Review the details on the Review and create page and choose Create catalog.

    Wait a few seconds for the catalog to show up.

    029-BDB 5089

    1. In the navigation pane, choose Catalogs.
    2. Verify that rl_link_container_ordersdb is created.

    030-BDB 5089

    Create a database under rl_link_container_ordersdb

    Complete the following steps:

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Databases.
    2. On the Choose catalog dropdown menu, choose rl_link_container_ordersdb.
    3. Choose Create database.

    Alternatively, you can choose the Create dropdown menu and then choose Database.

    1. Provide details for the database:
      1. For Name, enter a name (for this post, public_db).
      2. For Catalog, choose rl_link_container_ordersdb.
      3. Leave Location – optional as blank.
      4. Under Default permissions for newly created tables, deselect Use only IAM access control for new tables in this database.
      5. Choose Create database.

    031-BDB 5089

    1. Choose Catalogs in the navigation pane to verify that public_db is created under rl_link_container_ordersdb.

    032-BDB 5089

    Create a table resource link for the shared federated catalog table

    A resource link to a shared federated catalog table can reside only inside the database of a catalog link container. A resource link for such tables will not work if created inside the default catalog. For more details on resource links, refer to Creating a resource link to a shared Data Catalog table.

    Complete the following steps to create a table resource link:

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Tables.
    2. On the Create dropdown menu, choose Resource link.

    033-BDB 5089

    1. Provide details for the table resource link:
      1. For Resource link name, enter a name (for this post, rl_orderstbl).
      2. For Destination catalog, choose rl_link_container_ordersdb.
      3. For Database, choose public_db.
      4. For Shared table’s region, choose US West (Oregon).
      5. For Shared table, choose orderstbl.
      6. After the Shared table is selected, Shared table’s database and Shared table’s catalog ID should get automatically populated.
      7. Choose Create.

    034-BDB 5089

    1. In the navigation pane, choose Databases to verify that rl_orderstbl is created under public_db, inside rl_link_container_ordersdb.

    035-BDB 5089

    036-BDB 5089

    Create a database resource link for the shared default catalog database.

    Now we create a database resource link in the default catalog to query the Amazon S3 based Iceberg table shared from the producer. For details on database resource links, refer Creating a resource link to a shared Data Catalog database.

    Though we are able to see the shared database in the default catalog of the consumer, a resource link is required to query from analytics engines, such as Athena, Amazon EMR, and AWS Glue. When using AWS Glue with Lake Formation tables, the resource link needs to be named identically to the source account’s resource. For additional details on using AWS Glue with Lake Formation, refer to Considerations and limitations.

    Complete the following steps to create a database resource link:

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Databases.
    2. On the Choose catalog dropdown menu, choose the account ID to choose the default catalog.
    3. Search for customerdb.

    You should see the shared database name customerdb with the Owner account ID as that of your producer account ID.

    1. Select customerdb, and on the Create dropdown menu, choose Resource link.
    2. Provide details for the resource link:
      1. For Resource link name, enter a name (for this post, customerdb).
      2. The rest of the fields should be already populated.
      3. Choose Create.
    3. In the navigation pane, choose Databases and verify that customerdb is created under the default catalog. Resource link names will show in italicized font.

    037-BDB 5089

    Verify access as Admin using Athena

    Now you can verify your access using Athena. Complete the following steps:

    1. Open the Athena console.
    2. Make sure an S3 bucket is provided to store the Athena query results. For details, refer to Specify a query result location using the Athena console.
    3. In the navigation pane, verify both the default catalog and federated catalog tables by previewing them.
    4. You can also run a join query as follows. Pay attention to the three-point notation for referring to the tables from two different catalogs:
    SELECT
    returns_tb.market as Market,
    sum(orders_tb.quantity) as Total_Quantity
    FROM rl_link_container_ordersdb.public_db.rl_orderstbl as orders_tb
    JOIN awsdatacatalog.customerdb.returnstbl_iceberg as returns_tb
    ON orders_tb.order_id = returns_tb.order_id
    GROUP BY returns_tb.market;

    038-BDB 5089

    This verifies the new capability of SageMaker Lakehouse, which enables accessing Redshift cluster tables and Amazon S3 based Iceberg tables in the same query, across AWS accounts, through the Data Catalog, using Lake Formation permissions.

    Grant permissions to Glue-execution-role

    Now we will share the resources from the producer account with additional IAM principals in the consumer account. Usually, the data lake admin grants permissions to data analysts, data scientists, and data engineers in the consumer account to do their job functions, such as processing and analyzing the data.

    We set up Lake Formation permissions on the catalog link container, databases, tables, and resource links to the AWS Glue job execution role Glue-execution-role that we created in the prerequisites.

    Resource links allow only Describe and Drop permissions. You need to use the Grant on target configuration to provide database Describe and table Select permissions.

    Complete the following steps:

    1. On the Lake Formation console, choose Data permissions in the navigation pane.
    2. Choose Grant.
    3. Under Principals, select IAM users and roles.
    4. For IAM users and roles, enter Glue-execution-role.
    5. Under LF-Tags or catalog resources, select Named Data Catalog resources.
    6. For Catalogs, choose rl_link_container_ordersdb and the consumer account ID, which indicates the default catalog.
    7. Under Catalog permissions, select Describe for Catalog permissions.
    8. Choose Grant.

    039-BDB 5089

    040-BDB 5089

    1. Repeat these steps for the catalog rl_link_container_ordersdb:
      1. On the Databases dropdown menu, choose public_db.
      2. Under Database permissions, select Describe.
      3. Choose Grant.
    2. Repeat these steps again, but after choosing rl_link_container_ordersdb and public_db, on the Tables dropdown menu, choose rl_orderstbl.
      1. Under Resource link permissions, select Describe.
      2. Choose Grant.
    3. Repeat these steps to grant additional permissions to Glue-execution-role.
      1. For this iteration, grant Describe permissions on the default catalog databases public and customerdb.
      2. Grant Describe permission on the resource link customerdb.
      3. Grant Select permission on the tables returnstbl_iceberg and orderstbl.

    The following screenshots show the configuration for database public and customerdb permissions.

    041-BDB 5089

    042-BDB 5089

    The following screenshots show the configuration for resource link customerdb permissions.

    043-BDB 5089

    044-BDB 5089

    The following screenshots show the configuration for table returnstbl_iceberg permissions.

    045-BDB 5089

    046-BDB 5089

    The following screenshots show the configuration for table orderstbl permissions.

    047-BDB 5089

    048-BDB 5089

    1. In the navigation pane, choose Data permissions and verify permissions on Glue-execution-role.

    049-BDB 5089

    Run a PySpark job in AWS Glue 5.0

    Download the PySpark script LakeHouseGlueSparkJob.py. This AWS Glue PySpark script runs Spark SQL by joining the producer shared federated orderstbl table and Amazon S3 based returns table in the consumer account to analyze the data and identify the total orders placed per market.

    Replace <<consumer_account_id>> in the script with your consumer account ID. Complete the following steps to create and run an AWS Glue job:

    1. On the AWS Glue console, in the navigation pane, choose ETL jobs.
    2. Choose Create job, then choose Script editor.

    050-BDB 5089

    1. For Engine, choose Spark.
    2. For Options, choose Start fresh.
    3. Choose Upload script.
    4. Browse to the location where you downloaded and edited the script, select the script, and choose Open.
    5. On the Job details tab, provide the following information:
      1. For Name, enter a name (for this post, LakeHouseGlueSparkJob).
      2. Under Basic properties, for IAM role, choose Glue-execution-role.
      3. For Glue version, select Glue 5.0.
      4. Under Advanced properties, for Job parameters, choose Add new parameter.
      5. Add the parameters --datalake-formats = iceberg and --enable-lakeformation-fine-grained-access = true.
    6. Save the job.
    7. Choose Run to execute the AWS Glue job, and wait for the job to complete.
    8. Review the job run details from the Output logs

    051-BDB 5089

    052-BDB 5089

    Clean up

    To avoid incurring costs on your AWS accounts, clean up the resources you created:

    1. Delete the Lake Formation permissions, catalog link container, database, and tables in the consumer account.
    2. Delete the AWS Glue job in the consumer account.
    3. Delete the federated catalog, database, and table resources in the producer account.
    4. Delete the Redshift Serverless namespace in the producer account.
    5. Delete the S3 buckets you created as part of data transfer in both accounts and the Athena query results bucket in the consumer account.
    6. Clean up the IAM roles you created for the SageMaker Lakehouse setup as part of the prerequisites.

    Conclusion

    In this post, we illustrated how to bring your existing Redshift tables to SageMaker Lakehouse and share them securely with external AWS accounts. We also showed how to query the shared data warehouse and data lakehouse tables in the same Spark session, from a recipient account, using Spark in AWS Glue 5.0.

    We hope you find this useful to integrate your Redshift tables with an existing data mesh and access the tables using AWS Glue Spark. Test this solution in your accounts and share feedback in the comments section. Stay tuned for more updates and feel free to explore the features of SageMaker Lakehouse and AWS Glue versions.

    Appendix: Table creation

    Complete the following steps to create a returns table in the Amazon S3 based default catalog and an orders table in Amazon Redshift:

    1. Download the CSV format datasets orders and returns.
    2. Upload them to your S3 bucket under the corresponding table prefix path.
    3. Use the following SQL statements in Athena. First-time users of Athena should refer to Specify a query result location.
    CREATE DATABASE customerdb;
    CREATE EXTERNAL TABLE customerdb.returnstbl_csv(
      `returned` string, 
      `order_id` string, 
      `market` string)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY '\;' 
    LOCATION
      's3://<your-S3-bucket>/<prefix-for-returns-table-data>/'
    TBLPROPERTIES (
      'skip.header.line.count'='1'
    );
    
    select * from customerdb.returnstbl_csv limit 10; 
    

    053-BDB 5089

    1. Create an Iceberg format table in the default catalog and insert data from the CSV format table:
    CREATE TABLE customerdb.returnstbl_iceberg(
      `returned` string, 
      `order_id` string, 
      `market` string)
    LOCATION 's3://<your-producer-account-bucket>/returnstbl_iceberg/' 
    TBLPROPERTIES (
      'table_type'='ICEBERG'
    );
    
    INSERT INTO customerdb.returnstbl_iceberg
    SELECT *
    FROM returnstbl_csv;  
    
    SELECT * FROM customerdb.returnstbl_iceberg LIMIT 10; 
    

    054-BDB 5089

    1. To create the orders table in the Redshift Serverless namespace, open the Query Editor v2 on the Amazon Redshift console.
    2. Connect to the default namespace using your database admin user credentials.
    3. Run the following commands in the SQL editor to create the database ordersdb and table orderstbl in it. Copy the data from your S3 location of the orders data to the orderstbl:
    create database ordersdb;
    use ordersdb;
    
    create table orderstbl(
      row_id int, 
      order_id VARCHAR, 
      order_date VARCHAR, 
      ship_date VARCHAR, 
      ship_mode VARCHAR, 
      customer_id VARCHAR, 
      customer_name VARCHAR, 
      segment VARCHAR, 
      city VARCHAR, 
      state VARCHAR, 
      country VARCHAR, 
      postal_code int, 
      market VARCHAR, 
      region VARCHAR, 
      product_id VARCHAR, 
      category VARCHAR, 
      sub_category VARCHAR, 
      product_name VARCHAR, 
      sales VARCHAR, 
      quantity bigint, 
      discount VARCHAR, 
      profit VARCHAR, 
      shipping_cost VARCHAR, 
      order_priority VARCHAR
      );
    
    copy orderstbl
    from 's3://<your-s3-bucket>/ordersdatacsv/orders.csv' 
    iam_role 'arn:aws:iam::<producer-account-id>:role/service-role/<your-Redshift-Role>'
    CSV 
    DELIMITER ';'
    IGNOREHEADER 1
    ;
    
    select * from ordersdb.orderstbl limit 5;
    


    About the Authors

    055-BDB 5089Aarthi Srinivasan is a Senior Big Data Architect with Amazon SageMaker Lakehouse. She collaborates with the service team to enhance product features, works with AWS customers and partners to architect lakehouse solutions, and establishes best practices for data governance.

    056-BDB 5089Subhasis Sarkar is a Senior Data Engineer with Amazon. Subhasis thrives on solving complex technological challenges with innovative solutions. He specializes in AWS data architectures, particularly data mesh implementations using AWS CDK components.

How BMW Group built a serverless terabyte-scale data transformation architecture with dbt and Amazon Athena

Post Syndicated from Philipp Karg original https://aws.amazon.com/blogs/big-data/how-bmw-group-built-a-serverless-terabyte-scale-data-transformation-architecture-with-dbt-and-amazon-athena/

Businesses increasingly require scalable, cost-efficient architectures to process and transform massive datasets. At the BMW Group, our Cloud Efficiency Analytics (CLEA) team has developed a FinOps solution to optimize costs across over 10,000 cloud accounts. While enabling organization-wide efficiency, the team also applied these principles to the data architecture, making sure that CLEA itself operates frugally. After evaluating various tools, we built a serverless data transformation pipeline using Amazon Athena and dbt.

This post explores our journey, from the initial challenges to our current architecture, and details the steps we took to achieve a highly efficient, serverless data transformation setup.

Challenges: Starting from a rigid and costly setup

In our early stages, we encountered several inefficiencies that made scaling difficult. We were managing complex schemas with wide tables that required significant effort in maintainability. Initially, we used Terraform to create tables and views in Athena, allowing us to manage our data infrastructure as code (IaC) and automate deployments through continuous integration and delivery (CI/CD) pipelines. However, this method slowed us down when changing data models or dealing with schema changes, therefore requiring high development efforts.

As our solution grew, we faced challenges with query performance and costs. Each query scanned large amounts of raw data, resulting in increased processing time and higher Athena costs. We used views to provide a clean abstraction layer, but this masked underlying complexity because seemingly simple queries against these views scanned large volumes of raw data, and our partitioning strategy wasn’t optimized for these access patterns. As our datasets grew, the lack of modularity in our data design increased complexity, making scalability and maintenance increasingly difficult. We needed a solution for pre-aggregating, computing, and storing query results of computationally intensive transformations. The absence of robust testing and lineage solutions made it challenging to identify the root causes of data inconsistencies when they occurred.

As part of our business intelligence (BI) solution, we used Amazon QuickSight to build our dashboards, providing visual insights into our cloud cost data. However, our initial data architecture led to challenges. We were building dashboards on top of large, wide datasets, with some hitting the QuickSight per-dataset SPICE limit of 1 TB. Additionally, during SPICE ingest, our largest datasets required 4–5 hours of processing time due to performing full scans each time, often scanning over a terabyte of data. This architecture wasn’t helping us be more agile and quick while scaling up. The long processing times and storage limitations hindered our ability to provide timely insights and expand our analytics capabilities.

To address these issues, we enhanced the data architecture with AWS Lambda, AWS Step Functions, AWS Glue, and dbt. This tool stack significantly enhanced our development agility, empowering us to quickly modify and introduce new data models. At the same time, we improved our overall data processing efficiency with incremental loads and better schema management.

Solution overview

Our current architecture consists of a serverless and modular pipeline coordinated by GitHub Actions workflows. We chose Athena as our primary query engine for several strategic reasons: it aligns perfectly with our team’s SQL expertise, excels at querying Parquet data directly in our data lake, and alleviates the need for dedicated compute resources. This makes Athena an ideal fit for CLEA’s architecture, where we process around 300 GB daily from a data lake of 15 TB, with our largest dataset containing 50 billion rows across up to 400 columns. The capability of Athena to efficiently query large-scale Parquet data, combined with its serverless nature, enables us to focus on writing efficient transformations rather than managing infrastructure.

The following diagram illustrates the solution architecture.

Using this architecture, we’ve streamlined our data transformation process using dbt. In dbt, a data model represents a single SQL transformation that creates either a table or a view—essentially a building block of our data transformation pipeline. Our implementation includes around 400 such models, 50 data sources, and around 100 data tests. This setup enables seamless updates—whether creating new models, updating schemas, or modifying views—triggered simply by creating a pull request in our source code repository, with the rest handled automatically.

Our workflow automation includes the following features:

  • Pull request – When we create a pull request, it’s deployed to our testing environment first. After passing validation and being approved or merged, it’s deployed to production using GitHub workflows. This setup enables seamless model creation, schema updates, or view changes—triggered just by creating a pull request, with the rest handled automatically.
  • Cron scheduler – For nightly runs or multiple daily runs to reduce data latency, we use scheduled GitHub workflows. This setup allows us to configure specific models with different update strategies based on data needs. We can set models to update incrementally (processing only new or changed data), as views (querying without materializing data), or as full loads (completely refreshing the data). This flexibility optimizes processing time and resource usage. We can target only specific folders—like source, prepared, or semantic layers—and run the dbt test afterward to validate model quality.
  • On demand – When adding new columns or changing business logic, we need to update historical data to maintain consistency. For this, we use a backfill process, which is a custom GitHub workflow created by our team. The workflow allows us to select specific models, include their upstream dependencies, and set parameters like start and end dates. This makes sure that changes are applied accurately across the entire historical dataset, maintaining data consistency and integrity.

Our pipeline is organized into three primary stages—Source, Prepared, and Semantic—each serving a specific purpose in our data transformation journey. The Source stage maintains raw data in its original form. The Prepared stage cleanses and standardizes this data, handling tasks like deduplication and data type conversions. The Semantic stage transforms this prepared data into business-ready models aligned with our analytical needs. An additional QuickSight step handles visualization requirements. To achieve low cost and high performance, we use dbt models and SQL code to manage all transformations and schema changes. By implementing incremental processing strategies, our models process only new or changed data rather than reprocessing the entire dataset with each run.

The Semantic stage (not to be confused with dbt’s semantic layer feature) introduces business logic, transforming data into aggregated datasets that are directly consumable by BMW’s Cloud Data Hub, internal CLEA dashboards, data APIs, or In-Console Cloud Assistant (ICCA) chatbot. The QuickSight step further optimizes data by selecting only necessary columns by using a column-level lineage solution and setting a dynamic date filter with a sliding window to ingest only relevant hot data into SPICE, avoiding unused data in dashboards or reports.

This approach aligns with BMW Group’s broader data strategy, which includes streamlining data access using AWS Lake Formation for fine-grained access control.

Overall, as a high-level structure, we’ve fully automated schema changes, data updates, and testing through GitHub pull requests and dbt commands. This approach enables controlled deployment with robust version control and change management. Continuous testing and monitoring workflows uphold data accuracy, reliability, and quality across transformations, supporting efficient, collaborative model iteration.

Key benefits of the dbt-Athena architecture

To design and manage dbt models effectively, we use a multi-layered approach combined with cost and performance optimizations. In this section, we discuss how our approach has yielded significant benefits in five key areas.

SQL-based, developer-friendly environment

Our team already had strong SQL skills, so dbt’s SQL-centric approach was a natural fit. Instead of learning a new language or framework, developers could immediately start writing transformations using familiar SQL syntax with dbt. This familiarity aligns well with the SQL interface of Athena and, combined with dbt’s added functionality, has increased our team’s productivity.

Behind the scenes, dbt automatically handles synchronization between Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, and our models. When we need to change a model’s materialization type—for example, from a view to a table—it’s as simple as updating a configuration parameter rather than rewriting code. This flexibility has reduced our development time dramatically, allowed us to focus on building better data models rather than managing infrastructure.

Agility in modeling and deployment

Documentation is crucial for any data platform’s success. We use dbt’s built-in documentation capabilities by publishing them to GitHub Pages, which creates an accessible, searchable repository of our data models. This documentation includes table schemas, relationships between models, and usage examples, enabling team members to understand how models interconnect and how to use them effectively.

We use dbt’s built-in testing capabilities to implement comprehensive data quality checks. These include schema tests that verify column uniqueness, referential integrity, and null constraints, as well as custom SQL tests that validate business logic and data consistency. The testing framework runs automatically on every pull request, validating data transformations at each step of our pipeline. Additionally, dbt’s dependency graph provides a visual representation of how our models interconnect, helping us understand the upstream and downstream impacts of any changes before we implement them. When stakeholders need to modify models, they can submit changes through pull requests, which, after they’re approved and merged, automatically trigger the necessary data transformations through our CI/CD pipeline. This streamlined process enabled us to create new data products within days compared to weeks and reduced ongoing maintenance work by catching issues early in the development cycle.

Athena workgroup separation

We use Athena workgroups to isolate different query patterns based on their execution triggers and purposes. Each workgroup has its own configuration and metric reporting, allowing us to monitor and optimize separately. The dbt workgroup handles our scheduled nightly transformations and on-demand updates triggered by pull requests through our Source, Prepared, and Semantic stages. The dbt-test workgroup executes automated data quality checks during pull request validation and nightly builds. The QuickSight workgroup manages SPICE data ingestion queries, and the Ad-hoc workgroup supports interactive data exploration by our team.

Each workgroup can be configured with specific data usage quotas, enabling teams to implement granular governance policies. This separation provides several benefits: it enables clear cost allocation, provides isolated monitoring of query patterns across different use cases, and helps enforce data governance through custom workgroup settings. Amazon CloudWatch monitoring per workgroup helps us track usage patterns, identify query performance issues, and adjust configurations based on actual needs.

Using QuickSight SPICE

QuickSight SPICE (Super-fast, Parallel, In-memory Calculation Engine) provides powerful in-memory processing capabilities that we’ve optimized for our specific use cases. Rather than loading entire tables into SPICE, we create specialized views on top of our materialized semantic models. These views are carefully crafted to include only the necessary columns, relevant metadata joins, and appropriate time filtering to have only recent data available in dashboards.

We’ve implemented a hybrid refresh strategy for these SPICE datasets: daily incremental updates keep the data fresh, and weekly full refreshes maintain data consistency. This approach strikes a balance between data freshness and processing efficiency. The result is responsive dashboards that maintain high performance while keeping processing costs under control.

Scalability and cost-efficiency

The serverless architecture of Athena eliminates manual infrastructure management, automatically scaling based on query demand. Because costs are based solely on the amount of data scanned by queries, optimizing queries to scan as little data as possible directly reduces our costs. We use the distributed query execution capabilities of Athena through our dbt model structure, enabling parallel processing across data partitions. By implementing effective partitioning strategies and using Parquet file format, we minimize the amount of data scanned while maximizing query performance.

Our architecture offers flexibility in how we materialize data through views, full tables, and incremental tables. With dbt’s incremental models and partitioning strategy, we process only new or modified data instead of entire datasets. This approach has proven highly effective—we’ve observed significant reductions in data processing volume as well as data scanning, particularly in our QuickSight workgroup.

The effectiveness of these optimizations implemented at the end of 2023 is visible in the following diagram, showing costs by Athena workgroups.

The workgroups are illustrated as follows:

  • Green (QuickSight): Shows reduced data scanning post-optimization.
  • Light blue (Ad-hoc): Varies based on analysis needs.
  • Dark blue (dbt): Maintains consistent processing patterns
  • Orange (dbt-test): Shows regular, efficient test execution.

The increased dbt workload costs directly correlate with decreased QuickSight costs, reflecting our architectural shift from using complex views in QuickSight workgroups (which previously masked query complexity but led to repeated computations) to using dbt for materializing these transformations. Although this increased the dbt workload, the overall cost-efficiency improved significantly because materialized tables reduced redundant computations in QuickSight. This demonstrates how our optimization strategies successfully manage growing data volumes while achieving net cost reduction through efficient data materialization patterns.

Conclusion

Our data architecture uses dbt and Athena to provide a scalable, cost-efficient, and flexible framework for building and managing data transformation pipelines. Athena’s ability to query data directly in Amazon S3 alleviates the need to move or copy data into a separate data warehouse, and its serverless model and dbt’s incremental processing minimize both operational overhead and processing costs. Given our team’s strong SQL expertise, expressing these transformations in SQL through dbt and Athena was a natural choice, enabling rapid model development and deployment. With dbt’s automatic documentation and lineage, troubleshooting and identifying data issues is simplified, and the system’s modularity allows for quick adjustments to meet evolving business needs.

Starting with this architecture is quick and straightforward: all that is needed is the dbt-core and dbt-athena libraries, and Athena itself requires no setup, because it’s a fully serverless service with seamless integration with Amazon S3. This architecture is ideal for teams looking to rapidly prototype, test, and deploy data models, optimizing resource usage, accelerating deployment, and providing high-quality, accurate data processing.

For those interested in a managed solution from dbt, see From data lakes to insights: dbt adapter for Amazon Athena now supported in dbt Cloud.


About the Authors

Philipp Karg is a Lead FinOps Engineer at BMW Group and has a strong background in data engineering, AI, and FinOps. He focuses on driving cloud efficiency initiatives and fostering a cost-aware culture within the company to leverage the cloud sustainably.

Selman Ay is a Data Architect specializing in end-to-end data solutions, architecture, and AI on AWS. Outside of work, he enjoys playing tennis and engaging outdoor activities.

Cizer Pereira is a Senior DevOps Architect at AWS Professional Services. He works closely with AWS customers to accelerate their journey to the cloud. He has a deep passion for cloud-based and DevOps solutions, and in his free time, he also enjoys contributing to open source projects.

Amazon SageMaker Lakehouse now supports attribute-based access control

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/amazon-sagemaker-lakehouse-now-supports-attribute-based-access-control/

Amazon SageMaker Lakehouse now supports attribute-based access control (ABAC) with AWS Lake Formation, using AWS Identity and Access Management (IAM) principals and session tags to simplify data access, grant creation, and maintenance. With ABAC, you can manage business attributes associated with user identities and enable organizations to create dynamic access control policies that adapt to the specific context.

SageMaker Lakehouse is a unified, open, and secure data lakehouse that now supports ABAC to provide unified access to general purpose Amazon S3 buckets, Amazon S3 Tables, Amazon Redshift data warehouses, and data sources such as Amazon DynamoDB or PostgreSQL. You can then query, analyze, and join the data using Redshift, Amazon AthenaAmazon EMR, and AWS Glue. You can secure and centrally manage your data in the lakehouse by defining fine-grained permissions with Lake Formation that are consistently applied across all analytics and machine learning(ML) tools and engines. In addition to its support for role-based and tag-based access control, Lake Formation extends support to attribute-based access to simplify data access management for SageMaker Lakehouse, with the following benefits:

  • Flexibility – ABAC policies are flexible and can be updated to meet changing business needs. Instead of creating new rigid roles, ABAC systems allow access rules to be modified by simply changing user or resource attributes.
  • Efficiency – Managing a smaller number of roles and policies is more straightforward than managing a large number of roles, reducing administrative overhead.
  • Scalability – ABAC systems are more scalable for larger enterprises because they can handle a large number of users and resources without requiring a large number of roles.

Attribute-based access control overview

Previously, within SageMaker Lakehouse, Lake Formation granted access to resources based on the identity of a requesting user. Our customers were requesting the capability to express the full complexity required for access control rules in organizations. ABAC allows for more flexible and nuanced access policies that can better reflect real-world needs. Organizations can now grant permissions on a resource based on user attribute and is context-driven. This allows administrators to grant permissions on a resource with conditions that specify user attribute keys and values. IAM principals with matching IAM or session tag key-value pairs will gain access to the resource.

Instead of creating a separate role for each team member’s access to a specific project, you can set up ABAC policies to grant access based on attributes like membership and user role, reducing the number of roles required. For instance, without ABAC, a company with an account manager role that covers five different geographical territories needs to create five different IAM roles and grant data access for only the specific territory for which the IAM role is meant. With ABAC, they can simply add those territory attributes as keys/values to the principal tag and provide data access grants based on those attributes. If the value of the attribute for a user changes, access to the dataset will automatically be invalidated.

With ABAC, you can use attributes such as department or country and use IAM or sessions tags to determine access to data, making it more straightforward to create and maintain data access grants. Administrators can define fine-grained access permissions with ABAC to limit access to databases, tables, rows, columns, or table cells.

In this post, we demonstrate how to get started with ABAC in SageMaker Lakehouse and use with various analytics services.

Solution overview

To illustrate the solution, we are going to consider a fictional company called Example Retail Corp. Example Retail’s leadership is interested in analyzing sales data in Amazon S3 to determine in-demand products, understand customer behavior, and identify trends, for better decision-making and increased profitability. The sales department sets up a team for sales analysis with the following data access requirements:

  • All data analysts in the Sales department in the US get access to only sales-specific data in only US regions
  • All BI analysts in the Sales department have full access to data in only US regions
  • All scientists in the Sales department get access to only sales-specific data across all regions
  • Anyone outside of Sales department have no access to sales data

For this post, we consider the database salesdb, which contains the store_sales table that has store sales details. The table store_sales has the following schema.

To demonstrate the product sales analysis use case, we will consider the following personas from the Example Retail Corp:

  • Ava is a data administrator in Example Retail Corp who is responsible for supporting team members with specific data permission policies
  • Alice is a data analyst who should be able to access sales specific US store data to perform product sales analysis
  • Bob is a BI analyst who should be able to access all data from US store sales to generate reports
  • Charlie is a data scientist who should be able to access sales specific across all regions to explore and find patterns for trend analysis

Ava decides to use SageMaker Lakehouse to unify data across various data sources while setting up fine-grained access control using ABAC. Alice is excited about this decision as she can now build daily reports using her expertise with Athena. Bob now knows that he can quickly build Amazon QuickSight dashboards with queries that are optimized using Redshift’s cost-based optimizer. Charlie, being an open source Apache Spark contributor, is excited that he can build Spark based processing with Amazon EMR to build ML forecasting models.

Ava defines the user attributes as static IAM tags that could also include attributes stored in the identity provider (IdP) or as session tags dynamically to represent the user metadata. These tags are assigned to IAM users or roles and can be used to define or restrict access to specific resources or data. For more details, refer to Tags for AWS Identity and Access Management resources and Pass session tags in AWS STS.

For this post, Ava assigns users with static IAM tags to represent the user attributes, including their department membership, Region assignment, and current role relationship. The following table summarizes the tags that represent user attributes and user assignment.

User Persona Attributes Access
Alice Data Analyst Department=sales
Region=US
Role=Analyst
Sales specific data in US and no access to customer data
Bob BI Analyst Department=sales
Region=US
Role=BIAnalyst
All data in US
Charlie Data Scientist Department=sales
Region=ALL
Role=Scientist
Sales specific data in All regions and no access to customer data

Ava then defines access control policies in Lake Formation that grant or restrict access to certain resources based on predefined criteria (user attributes defined using IAM tags) being satisfied. This allows for flexible and context-aware security policies where access privileges can be adjusted dynamically by modifying the user attribute assignment without changing the policy rules. The following table summarizes the policies in the Sales department.

Access User Attributes Policy
All analysts (including Alice) in US get access to sales specific data in US regions Department=sales
Region=US
Role=Analyst
Table: store_sales (store_id, transaction_date, product_name, country, sales_price, quantity columns)
Row filter: country='US'
All BI analysts (including Bob) in US get access to all data in US regions Department=sales
Region=US
Role=BIAnalyst
Table: store_sales (all columns)
Row filter: country='US'
All scientists (including Charlie) get access to sales-specific data from all regions Department=sales
Region=ALL
Role=Scientist
Table: store_sales (all rows)
Column filter: store_id, transaction_date, product_name, country, sales_price,quantity

The following diagram illustrates the solution architecture.

Implementing this solution consists of the following high-level steps. For Example Retail, Ava as a data Administrator performs these steps:

  1. Define the user attributes and assign them to the principal.
  2. Grant permission on the resources (database and table) to the principal based on user attributes.
  3. Verify the permissions by querying the data using various analytics services.

Prerequisites

To follow the steps in this post, you must complete the following prerequisites:

  1. AWS account with access to the following AWS services:
    • Amazon S3
    • AWS Lake Formation and AWS Glue Data Catalog
    • Amazon Redshift
    • Amazon Athena
    • Amazon EMR
    • AWS Identity and Access Management (IAM)
  1. Set up an admin user for Ava. For instructions, see Create a user with administrative access.
  2. Setup S3 bucket for uploading script.
  3. Set up a data lake admin. For instructions, see Create a data lake administrator.
  4. Create IAM user named Alice and attach permissions for Athena access. For instructions, refer to Data analyst permissions.
  5. Create IAM user Bob and attach permissions for Redshift access.
  6. Create IAM user Charlie and attach permissions for EMR Serverless access.
  7. Create job runtime role: scientist_role and that will be used by Charlie. For instruction refer to: Job runtime roles for Amazon EMR Serverless
  8. Setup EMR Serverless application with Lake Formation enabled. For instruction refer to: Using EMR Serverless with AWS Lake Formation for fine-grained access control
  9. Have an existing AWS Glue database or table and Amazon Simple Storage Service (Amazon) S3 bucket that holds the table data. For this post, we use salesdb as our database, store_sales as our table, and data is stored in an S3 bucket.

Define attributes for the IAM principals Alice, Bob, Charlie

Ava completes the following steps to define the attributes for the IAM principal:

  1. Log in as an admin user and navigate to the IAM console.
  2. Choose Users under Access management in the navigation pane and search for the user Alice.
  3. Choose the user and choose the Tags tab.
  4. Choose Add new tag and provide the following key pairs:
    • Key: Department and value: sales
    • Key: Region and value: US
    • Key: Role and value: Analyst
  5. Choose Save changes.
  6. Repeat the process for the user Bob and provide the following key pairs:
    • Key: Department and value: sales
    • Key: Region and value: US
    • Key: Role and value: BIAnalyst
  7. Repeat the process for the user Charlie and IAM role scientist_role and provide the following key pairs:
    • Key: Department and value: sales
    • Key: Region and value: ALL
    • Key: Role and value: Scientist

Grant permissions to Alice, Bob, Charlie using ABAC

Ava now grants database and table permissions to users with ABAC.

Grant database permissions

Complete the following steps:

  1. Ava logs in as data lake admin and navigate to the Lake Formation console.
  2. In the navigation pane, under Permissions, choose Data lake permissions.
  3. Choose Grant.
  4. On the Grant permissions page, choose Principals by attribute.
  5. Specify the following attributes:
    • Key: Department  and value: sales
    • Key: Role and value: Analyst,Scientist
  6. Review the resulting policy expression.
  7. For Permission scope, select This account.
  8. Next, choose the catalog resources to grant access:
    • For Catalogs, enter the account ID.
    • For Databases, enter salesdb.
  9. For Database permissions, select Describe.
  10. Choose Grant.

Ava now verifies the database permission by navigating to the Databases tab under the Data Catalog and searching for salesdb. Select salesdb and choose View under Actions.

Grant table permissions to Alice

Complete the following steps to create a data filter to view sales specific columns in store_sales records whose country=US:

  1. On the Lake Formation console, choose Data filters under Data Catalog in the navigation pane.
  2. Choose Create new filter.
  3. Provide the data filter name as us_sales_salesonlydata.
  4. For Target catalog, enter the account ID.
  5. For Target database, choose salesdb.
  6. For Target table, choose store_sales.
  7. For column-level access, choose Include columns: store_id, item_code, transaction_date, product_name, country, sales_price, and quantity.
  8. For Row-level access, choose Filter rows and enter the row filter country='US'.
  9. Choose Create data filter.
  1. On the Grant permissions page, choose Principals by attribute.
  2. Specify the attributes:
    • Key: Department and value: sales
    • Key: Role as value: Analyst
    • Key: Region and value: US
  3. Review the resulting policy expression.
  4. For Permission scope, select This account.
  5. Choose the catalog resources to grant access:
    • Catalogs: Account ID
    • Databases: salesdb
    • Table: store_sales
    • Data filters: us_sales
  6. For Data filter permissions, select Select.
  7. Choose Grant.

Grant table permissions to Bob

Complete the following steps to create a data filter to view only store_sales records whose country=US:

  1. On the Lake Formation console, choose Data filters under Data Catalog in the navigation pane.
  2. Choose Create new filter.
  3. Provide the data filter name as us_sales.
  4. For Target catalog, enter the account ID.
  5. For Target database, choose salesdb.
  6. For Target table, choose store_sales.
  7. Leave Column-level access as Access to all columns.
  8. For Row-level access, enter the row filter country='US'.
  9. Choose Create data filter.

Complete the following steps to grant table permissions to Bob:

  1. On the Grant permissions page, choose Principals by attribute.
  2. Specify the attributes:
    • Key: Department and value: sales
    • Key: Role as value: BIAnalyst
    • Key: Region and value: US
  3. Review the resulting policy expression.
  4. For Permission scope, select This account.
  5. Choose the catalog resources to grant access:
    • Catalogs: Account ID
    • Databases: salesdb
    • Table: store_sales
  6. For Data filter permissions, select Select.
  7. Choose Grant.

Grant table permissions to Charlie

Complete the following steps to grant table permissions to Charlie:

  1. On the Grant permissions page, choose Principals by attribute.
  2. Specify the attributes:
    1. Key: Department and value: sales
    2. Key: Role as value: Scientist
    3. Key: Region and value: ALL
  3. Review the resulting policy expression.
  4. For Permission scope, select This account
  5. Choose the catalog resources to grant access:
    1. Catalogs: Account ID
    2. Databases: salesdb
    3. Table: store_sales
  6. For Table permissions, select Select.
  7. For Data permissions, specify the following columns: store_id, transaction_date, product_name, country, sales_price, and quantity.
  8. Choose Grant.

Alice now verifies the table permission by navigating to the Tables tab under the Data Catalog and searching for store_sales. Select store_sales and choose View under Actions. The following screenshots show the details for both sets of permissions.

Data Analyst uses Athena for building daily sales reports

Alice, the data analyst logs in to the Athena console and run the following query:

select * from "salesdb"."store_sales" limit 5

Alice has the user attributes as Department=sales, Role=Analyst, Region=US, and this attribute combination allows her access to US sales data to specific sales only column, without access to customer data as shown in the following screenshot.

BI Analyst uses Redshift for building sales dashboards

Bob, the BI Analyst, logs in to the Redshift console and run the following query:

select * from "salesdb"."store_sales" limit 10

Bob has the user attributes Department=sales, Role=BIAnalyst, Region=US, and this attribute combination allows him access to all columns including customer data for US sales data.

Data Scientist uses Amazon EMR to process sales data

Finally, Charlie logs in to the EMR console and submit the EMR job with runtime role as scientist_role. Charlie uses  the script sales_analysis.py that is uploaded to s3 bucket created for the script. He chooses the EMR Serverless application created with Lake Formation enabled.

Charlie submits batch job runs by choosing the following values:

  • Name: sales_analysis_Charlie
  • Runtime_role: scientist_role
  • Script location: <s3_script_path>/sales_analysis.py
  • For spark properties, provide key as spark.emr-serverless.lakeformation.enabled and value as true.
  • Additional configurations: Under Metastore configuration select Use AWS Glue Data Catalog as metastore. Charlie keeps rest of the configuration as default.

Once the job run is completed, Charlie can view the output by selecting stdout under Driver log files.

Charlie uses scientist_role as job runtime role with the attributes Department=sales, Role=Scientist, Region=ALL, and this attribute combination allows him access to select columns of all sales data.

Clean up

Complete the following steps to delete the resources you created to avoid unexpected costs:

  1. Delete the IAM users created.
  2. Delete the AWS Glue database and table resources created for the post, if any.
  3. Delete the Athena, Redshift and EMR resources created for the post.

Conclusion

In this post, we showcased how you can use SageMaker Lakehouse attribute-based access control, using IAM principals and session tags to simplify data access, grant creation, and maintenance. With attribute-based access control, you can manage permissions using dynamic business attributes associated with user identities and secure your data in the lakehouse by defining fine-grained permissions in the Lake Formation that are enforced across analytics and ML tools and engines.

For more information, refer to documentation. We encourage you to try out the SageMaker Lakehouse with ABAC and share your feedback with us.


About the authors

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Read and write Apache Iceberg tables using AWS Lake Formation hybrid access mode

Post Syndicated from Aarthi Srinivasan original https://aws.amazon.com/blogs/big-data/read-and-write-apache-iceberg-tables-using-aws-lake-formation-hybrid-access-mode/

Enterprises are adopting Apache Iceberg table format for its multitude of benefits. The change data capture (CDC), ACID compliance, and schema evolution features cater to representing big datasets that receive new records at a fast pace. In an earlier blog post, we discussed how to implement fine-grained access control in Amazon EMR Serverless using AWS Lake Formation for reads. Lake Formation helps you centrally manage and scale fine-grained data access permissions and share data with confidence within and outside your organization.

In this post, we demonstrate how to use Lake Formation for read access while continuing to use AWS Identity and Access Management (IAM) policy-based permissions for write workloads that update the schema and upsert (insert and update combined) data records into the Iceberg tables. The bimodal permissions are needed to support existing data pipelines that use only IAM and Amazon Simple Storage Service (Amazon) S3 bucket policy-based permissions and to support table operations that are not yet available in the analytics engines. The two-way permission is achieved by registering the Amazon S3 data location of the Iceberg table with Lake Formation in hybrid access mode. Lake Formation hybrid access mode allows you to onboard new users with Lake Formation permissions to access AWS Glue Data Catalog tables with minimal interruptions to existing IAM policy-based users. With this solution, organizations can use the Lake Formation permissions to scale the access of their existing Iceberg tables in Amazon S3 to new readers. You can extend the methodology to other open table formats, such as Linux Foundation Delta Lake tables and Apache Hudi tables.

Key use cases for Lake Formation hybrid access mode

Lake Formation hybrid access mode is useful in the following use cases:

  • Avoiding data replication – Hybrid access mode helps onboard new users with Lake Formation permissions on existing Data Catalog tables. For example, you can enable a subset of data access (coarse vs. fine-grained access) for various user personas, such as data scientists and data analysts, without making multiple copies of the data. This also helps maintain a single source of truth for production and business insights.
  • Minimal interruption to existing IAM policy-based user access – With hybrid access mode, you can add new Lake Formation managed users with minimal disruptions to your existing IAM and Data Catalog policy-based user access. Both access methods can coexist for the same catalog table, but each user can have only one mode of permissions.
  • Transactional table writes – Certain write operations like insert, update, and delete are not supported by Amazon EMR for Lake Formation managed Iceberg tables. Refer to Considerations and limitations for additional details. Although you could use Lake Formation permissions for Iceberg table read operations, you could manage the write operations as the table owners with IAM policy-based access.

Solution overview

An example Enterprise Corp has a large number of Iceberg tables based on Amazon S3. They are currently managing the Iceberg tables manually with IAM policy, Data Catalog resource policy, and S3 bucket policy-based access in their organization. They want to share their transactional data of Iceberg tables across different teams, such as data analysts and data scientists, asking for read access across a few lines of business. While maintaining the ownership of the table’s updates to their single team, they want to provide restricted read access to certain columns of their tables. This is achieved by using the hybrid access mode feature of Lake Formation.

In this post, we illustrate the scenario with a data engineer team and a new data analyst team. The data engineering team owns the extract, transform, and load (ETL) application that will process the raw data to create and maintain the Iceberg tables. The data analyst team will query the tables to gather business insights from those tables. The ETL application will use IAM role-based access to the Iceberg table, and the data analyst gets Lake Formation permissions to query the same tables.

The solution can be visually represented in the following diagram.

Solution Overview

For ease of illustration, we use only one AWS account in this post. Enterprise use cases typically have multiple accounts or cross-account access requirements. The setup of the Iceberg tables, Lake Formation permissions, and IAM based permissions are similar for multiple and cross-account scenarios.

The high-level steps involved in the permissions setup are as follows:

  1. Make sure that IAMAllowedPrincipals has Super access to the database and tables in Lake Formation. IAMAllowedPrincipals is a virtual group that represents any IAM principal permissions. Super access to this virtual group is required to make sure that IAM policy-based permissions to any IAM principal continues to work.
  2. Register the data location with Lake Formation in hybrid access mode.
  3. Grant DATA LOCATION permission to the IAM role that manages the table with IAM policy-based permissions. Without the DATA LOCATION permission, write workloads will fail. Test the access to the table by writing new records to the table as the IAM role.
  4. Add SELECT table permissions to the Data-Analyst role in Lake Formation.
  5. Opt-in the Data-Analyst to the Iceberg table, making the Lake Formation permissions effective for the analyst.
  6. Test access to the table as the Data-Analyst by running SELECT queries in Athena.
  7. Test the table write operations by adding new records to the table as ETL-application-role using EMR Serverless.
  8. Read the latest update, again, as Data-Analyst.

Prerequisites

You should have the following prerequisites:

  • An AWS account with a Lake Formation administrator configured. Refer to Data lake administrator permissions and Set up AWS Lake Formation. You can also refer to Simplify data access for your enterprise using Amazon SageMaker Lakehouse for the Lake Formation admin setup in your AWS account. For ease of demonstration, we have used an IAM admin role added as a Lake Formation administrator.
  • An S3 bucket to host the sample Iceberg table data and metadata.
  • An IAM role to register your Iceberg table Amazon S3 location with Lake Formation. Follow the policy and trust policy details for a user-defined role creation from Requirements for roles used to register locations.
  • An IAM role named ETL-application-role, which will be the runtime role to execute jobs in EMR Serverless. The minimum policy required is shown in the following code snippet. Replace the Amazon S3 data location of the Iceberg table, database name, and AWS Key Management Service (AWS KMS) key ID with your own. For additional details on the role setup, refer to Job runtime roles for Amazon EMR Serverless. This role can insert, update, and delete data in the table.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "IcebergDataAccessInS3",
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:Get*",
                    "s3:Put*",
                    "s3:Delete*"
                ],
                "Resource": [
                    "arn:aws:s3:::your-iceberg-data-bucket-name",
                    "arn:aws:s3:::your-iceberg-data-bucket-name/*"
                ]
            },
            {
                "Sid": "GlueCatalogApiPermissions",
                "Effect": "Allow",
                "Action": [
                    "glue:*"
                ],
                "Resource": [
                    "arn:aws:glue:your-Region:account-id:catalog",
                    "arn:aws:glue:your-Region:account-id:database/iceberg-database-name",
                    "arn:aws:glue:your-Region:account-id:database/default",
                    "arn:aws:glue:your-Region:account-id:table/*/*"
                ]
            },
            {
                "Sid": "KmsKeyPermissions",
                "Effect": "Allow",
                "Action": [
                    "kms:Encrypt",
                    "kms:Decrypt",
                    "kms:ReEncrypt*",
                    "kms:GenerateDataKey",
                    "kms:DescribeKey",
                    "kms:ListKeys",
                    "kms:ListAliases"
                ],
                "Resource": [
                    "arn:aws:kms:your-Region:account-id:key/your-key-id"
                ]
            }
        ]
    }

    Add the following trust policy to the role:

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "emr-serverless.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

  • An IAM role called Data-Analyst, to represent the data analyst access. Use the following policy to create the role. Also attach the AWS managed policy arn:aws:iam::aws:policy/AmazonAthenaFullAccess to the role, to allow querying the Iceberg table using Amazon Athena. Refer to Data engineer permissions for additional details about this role.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFBasicUser",
                "Effect": "Allow",
                "Action": [
                    "glue:GetCatalog",
                    "glue:GetCatalogs",
                    "glue:GetTable",
                    "glue:GetTables",
                    "glue:GetTableVersion",
                    "glue:GetTableVersions",
                    "glue:GetDatabase",
                    "glue:GetDatabases",
                    "glue:GetPartition",
                    "glue:GetPartitions",
                    "lakeformation:GetDataAccess"
                ],
                "Resource": "*"
            },
            {
                "Sid": "AthenaResultsBucket",
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation",
                    "s3:Put*",
                    "s3:Get*",
                    "s3:Delete*"
                ],
                "Resource": [
                    "arn:aws:s3:::your-bucket-name-prefix",
                    "arn:aws:s3:::your-bucket-name-prefix/*"
                ]
            }
        ]
    }

    Add the following trust policy to the role:

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<your_account_id>:root"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

Create the Iceberg table

Complete the following steps to create the Iceberg table:

  1. Sign in to the Lake Formation console as the admin role.
  2. In the navigation pane under Data Catalog, choose Databases.
  3. From the Create dropdown menu, create a database named iceberg_db. You can leave the Amazon S3 location property empty for the database.
  4. On the Athena console, run the following provided queries. The queries perform the following operations:
    1. Create a table called customer_csv, pointing to the customer dataset in the public S3 bucket.
    2. Create an Iceberg table called customer_iceberg, pointing to your S3 bucket location that will host the Iceberg table data and metadata.
    3. Insert data from the CSV table to the Iceberg table.
      CREATE EXTERNAL TABLE `iceberg_db`.`customer_csv`(
        `c_customer_sk` int,
        `c_customer_id` string,
        `c_current_cdemo_sk` int,
        `c_current_hdemo_sk` int,
        `c_current_addr_sk` int,
        `c_first_shipto_date_sk` int,
        `c_first_sales_date_sk` int,
        `c_salutation` string,
        `c_first_name` string,
        `c_last_name` string,
        `c_preferred_cust_flag` string,
        `c_birth_day` int,
        `c_birth_month` int,
        `c_birth_year` int,
        `c_birth_country` string,
        `c_login` string,
        `c_email_address` string,
        `c_last_review_date` string)
      ROW FORMAT DELIMITED
        FIELDS TERMINATED BY '|'
      STORED AS INPUTFORMAT
        'org.apache.hadoop.mapred.TextInputFormat'
      OUTPUTFORMAT
        'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
      LOCATION
        ' s3://redshift-downloads/TPC-DS/2.13/10GB/customer/'
      TBLPROPERTIES (
        'classification'='csv');   
      
       SELECT * FROM customer_csv LIMIT 5; //verifies table data  
      
      CREATE TABLE IF NOT EXISTS iceberg_db.customer_iceberg (
              c_customer_sk             int,
              c_customer_id             string,
              c_current_cdemo_sk        int,
              c_current_hdemo_sk        int,
              c_current_addr_sk         int,
              c_first_shipto_date_sk    int,
              c_first_sales_date_sk     int,
              c_salutation              string,
              c_first_name              string,
              c_last_name               string,
              c_preferred_cust_flag     string,
              c_birth_day               int,
              c_birth_month             int,
              c_birth_year              int,
              c_birth_country           string,
              c_login                   string,
              c_email_address           string,
              c_last_review_date        string
          )
      LOCATION 's3://your-iceberg-data-bucket-name/path/'
      TBLPROPERTIES ( 'table_type' = 'ICEBERG' );
      
      INSERT INTO customer_iceberg
      SELECT *
      FROM customer_csv;  
      
      SELECT * FROM customer_iceberg LIMIT 5; //verifies table data

Set up the Iceberg table as a hybrid access mode resource

Complete the following steps to set up the Iceberg table’s Amazon S3 data location as hybrid access mode in Lake Formation:

  1. Register your table location with Lake Formation:
    1. Sign in to the Lake Formation console as data lake administrator.
    2. In the navigation pane, choose Data lake Locations.
    3. For Amazon S3 path, provide the S3 prefix of your Iceberg table location that holds both the data and metadata of the table.
    4. For IAM role, provide the user-defined role that has permissions to your Iceberg table’s Amazon S3 location and that you created according to the prerequisites. For more details, refer to Registering an Amazon S3 location.
    5. For Permission mode, select Hybrid access mode.
    6. Choose Register location to register your Iceberg table Amazon S3 location with Lake Formation.

  1. Add data location permission to ETL-application-role:
    1. In the navigation pane, choose Data locations.
    2. For IAM users and roles, choose ETL-application-role.
    3. For Storage location, provide the S3 prefix of your Iceberg table.
    4. Choose Grant.

Data location permission is required for write operations to the Iceberg table location only if the Iceberg table’s S3 prefix is a child location of the database’s Amazon S3 location property.

  1. Grant Super access on the Iceberg database and table to IAMAllowedPrincipals:
    1. In the navigation pane, choose Data permissions.
    2. Choose IAM users and roles and choose IAMAllowedPrincipals.
    3. For LF-Tags or catalog resources, choose Named Data Catalog resources.
    4. Under Databases, select the name of your Iceberg table’s database.
    5. Under Database permissions, select Super.
    6. Choose Grant.

    7. Repeat the preceding steps and for Tables – optional, choose the Iceberg table.
    8. Under Table permissions, select Super.
    9. Choose Grant.

  1. Add database and table permissions to the Data-Analyst role:
    1. Repeat the steps in Step 3 to grant permissions for the Data-Analyst role, once for database-level permission and once for table-level permission.
    2. Select Describe permissions for the Iceberg database.
    3. Select Select permissions for the Iceberg table.
    4. Under Hybrid access mode, select Make Lake Formation permissions effective immediately.
    5. Choose Grant.

The following screenshots show the database permissions for Data-Analyst.

The following screenshots show the table permissions for Data-Analyst.

  1. Verify Lake Formation permissions on the Iceberg table and database to both Data-Analyst and IAMAllowedPrincipals:
    1. In the navigation pane, choose Data permissions.
    2. Filter by Table= customer_iceberg.
      You should see IAMAllowedPrincipals with All permission and Data-Analyst with Select permission.
    3. Similarly, verify permissions for the database by filtering database=iceberg_db.

You should see IAMAllowedPrincipals with All permission and Data-Analyst with Describe permission.

  1. Verify Lake Formation opt-in for Data-Analyst:
    1. In the navigation pane, choose Hybrid access mode.

You should see Data-Analyst opted-in for both database and table level permissions.

Query the table as the Data-Analyst role in Athena

While you are logged in to the AWS Management Console as admin, set up the Athena query results bucket:

  1. On the console navigation bar, choose your user name.
  2. Choose Switch role to switch to the Data-Analyst role.
  3. Enter your account ID, IAM role name (Data-Analyst), and choose Switch Role.
  4. Now that you’re logged in as the Data-Analyst role, open the Athena console and set up the Athena query results bucket.
  5. Run the following query to read the Iceberg table. This verifies the Select permission granted to the Data-Analyst role in Lake Formation.
SELECT * FROM "iceberg_db"."customer_iceberg"
WHERE c_customer_sk = 247

Upsert data as ETL-application-role using Amazon EMR

To upsert data to Lake Formation enabled Iceberg tables, we will use Amazon EMR Studio, which is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug data engineering and data science applications written in R, Python, Scala, and PySpark. EMR Studio will be our web-based IDE to run our notebooks, and we will use EMR Serverless as the compute engine. EMR Serverless is a deployment option for Amazon EMR that provides a serverless runtime environment. For the steps to run an interactive notebook, see Submit a job run or interactive workload.

  1. Sign out of the AWS console as Data-Analyst and log back or switch the user to admin.
  2. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  3. Choose Get started.
  4. For first-time users, Amazon EMR allows creation of an EMR Studio without a virtual private cloud (VPC). Create an EMR Serverless application as follows:
    1. Provide a name for the EMR Serverless application, such as DemoHybridAccess.
    2. Under Application setup, choose Use default settings for interactive workloads.
    3. Choose Create and start application.

The next step is to create an EMR Studio.

  1. On the Amazon EMR console, choose Studio under EMR Studio in the navigation pane.
  2. Choose Create Studio.
  3. Select Interactive workloads.
  4. You should see a default pre-populated section. Keep these default settings and choose Create Studio and launch Workspace.

  1. After the workspace is launched, attach the EMR Serverless application created earlier and select ETL-application-role as the runtime role under Compute.

  1. Download the notebook Iceberg-hybridaccess_final.ipynb and upload it to EMR Studio workspace.

This notebook configures the metastore properties to work with Iceberg tables. (For more details, see Using Apache Iceberg with EMR Serverless.) Then it performs insert, update, and delete operations in the Iceberg table. It also verifies if the operations are successful by reading the newly added data.

  1. Select PySpark as the kernel and execute each cell in the notebook by choosing the run icon.

Refer to Submit a job run or interactive workload for further details about how to run an interactive notebook.

The following screenshot shows that the Iceberg table insert operation completed successfully.

The following screenshot illustrates running the update statement on the Iceberg table in the notebook.

The following screenshot shows that the Iceberg table delete operation completed successfully.

Query the table again as Data-Analyst using Athena

Complete the following steps:

  1. Switch your role to Data-Analyst on the AWS console.
  2. Run the following query on the Iceberg table and read the row that was updated by the EMR cluster:
    SELECT * FROM "iceberg_db"."customer_iceberg"
    WHERE c_customer_sk = 247

The following screenshot shows the results. As we can see, ‘c_first_name’ column is updated with new value.

Clean up

To avoid incurring costs, clean up the resources you used for this post:

  1. Revoke the Lake Formation permissions and hybrid access mode opt-in granted to the Data-Analyst role and IAMAllowedPrincipals.
  2. Revoke the registration of the S3 bucket to Lake Formation.
  3. Delete the Athena query results from your S3 bucket.
  4. Delete the EMR Serverless resources.
  5. Delete Data-Analyst role and ETL-application-role from IAM.

Conclusion

In this post, we demonstrated how to scale the adoption and use of Iceberg tables using Lake Formation permissions for read workloads, while maintaining full control over table schema and data updates through IAM policy-based permissions for the table owners. The methodology also applies to other open table formats and standard Data Catalog tables, but the Apache Spark configuration for each open table format will vary.

Hybrid access mode in Lake Formation is an option you could use to adopt Lake Formation permissions gradually and scale those use cases that support Lake Formation permissions while using IAM based permissions for the use cases that don’t. We encourage you to try out this setup in your environment. Please share your feedback and any additional topics you would like to see in the comments section.


About the Authors

Aarthi Srinivasan is a Senior Big Data Architect with AWS Lake Formation. She collaborates with the service team to enhance product features, works with AWS customers and partners to architect lake house solutions, and establishes best practices.

Parul Saxena is a Senior Big Data Specialist Solutions Architect in AWS. She helps customers and partners build highly optimized, scalable, and secure solutions. She specializes in Amazon EMR, Amazon Athena, and AWS Lake Formation, providing architectural guidance for complex big data workloads and assisting organizations in modernizing their architectures and migrating analytics workloads to AWS.

Accelerate your analytics with Amazon S3 Tables and Amazon SageMaker Lakehouse

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/accelerate-your-analytics-with-amazon-s3-tables-and-amazon-sagemaker-lakehouse/

Amazon SageMaker Lakehouse is a unified, open, and secure data lakehouse that now seamlessly integrates with Amazon S3 Tables, the first cloud object store with built-in Apache Iceberg support. With this integration, SageMaker Lakehouse provides unified access to S3 Tables, general purpose Amazon S3 buckets, Amazon Redshift data warehouses, and data sources such as Amazon DynamoDB or PostgreSQL. You can then query, analyze, and join the data using Redshift, Amazon Athena, Amazon EMR, and AWS Glue. In addition to your familiar AWS services, you can access and query your data in-place with your choice of Iceberg-compatible tools and engines, providing you the flexibility to use SQL or Spark-based tools and collaborate on this data the way you like. You can secure and centrally manage your data in the lakehouse by defining fine-grained permissions with AWS Lake Formation that are consistently applied across all analytics and machine learning(ML) tools and engines.

Organizations are becoming increasingly data driven, and as data becomes a differentiator in business, organizations need faster access to all their data in all locations, using preferred engines to support rapidly expanding analytics and AI/ML use cases. Let’s take an example of a retail company that started by storing their customer sales and churn data in their data warehouse for business intelligence reports. With massive growth in business, they need to manage a variety of data sources as well as exponential growth in data volume. The company builds a data lake using Apache Iceberg to store new data such as customer reviews and social media interactions.

This enables them to cater to their end customers with new personalized marketing campaigns and understand its impact on sales and churn. However, data distributed across data lakes and warehouses limits their ability to move quickly, as it may require them to set up specialized connectors, manage multiple access policies, and often resort to copying data, that can increase cost in both managing the separate datasets as well as redundant data stored. SageMaker Lakehouse addresses these challenges by providing secure and centralized management of data in data lakes, data warehouses, and data sources such as MySQL, and SQL Server by defining fine-grained permissions that are consistently applied across data in all analytics engines.

In this post, we guide you how to use various analytics services using the integration of SageMaker Lakehouse with S3 Tables. We begin by enabling integration of S3 Tables with AWS analytics services. We create S3 Tables and Redshift tables and populate them with data. We then set up SageMaker Unified Studio by creating a company specific domain, new project with users, and fine-grained permissions. This lets us unify data lakes and data warehouses and use them with analytics services such as Athena, Redshift, Glue, and EMR.

Solution overview

To illustrate the solution, we are going to consider a fictional company called Example Retail Corp. Example Retail’s leadership is interested in understanding customer and business insights across thousands of customer touchpoints for millions of their customers that will help them build sales, marketing, and investment plans. Leadership wants to conduct an analysis across all their data to identify at-risk customers, understand impact of personalized marketing campaigns on customer churn, and develop targeted retention and sales strategies.

Alice is a data administrator in Example Retail Corp who has embarked on an initiative to consolidate customer information from multiple touchpoints, including social media, sales, and support requests. She decides to use S3 Tables with Iceberg transactional capability to achieve scalability as updates are streamed across billions of customer interactions, while providing same durability, availability, and performance characteristics that S3 is known for. Alice already has built a large warehouse with Redshift, which contains historical and current data about sales, customers prospects, and churn information.

Alice supports an extended team of developers, engineers, and data scientists who require access to the data environment to develop business insights, dashboards, ML models, and knowledge bases. This team includes:

Bob, a data analyst who needs to access to S3 Tables and warehouse data to automate building customer interactions growth and churn across various customer touchpoints for daily reports sent to leadership.

Charlie, a Business Intelligence analyst who is tasked to build interactive dashboards for funnel of customer prospects and their conversions across multiple touchpoints and make those available to thousands of Sales team members.

Doug, a data engineer responsible for building ML forecasting models for sales growth using the pipeline and/or customer conversion across multiple touchpoints and make those available to finance and planning teams.

Alice decides to use SageMaker Lakehouse to unify data across S3 Tables and Redshift data warehouse. Bob is excited about this decision as he can now build daily reports using his expertise with Athena. Charlie now knows that he can quickly build Amazon QuickSight dashboards with queries that are optimized using Redshift’s cost-based optimizer. Doug, being an open source Apache Spark contributor, is excited that he can build Spark based processing with AWS Glue or Amazon EMR to build ML forecasting models.

The following diagram illustrates the solution architecture.

Implementing this solution consists of the following high-level steps. For Example Retail, Alice as a data Administrator performs these steps:

  1. Create a table bucket. S3 Tables stores Apache Iceberg tables as S3 resources, and customer details are managed in S3 Tables. You can then enable integration with AWS analytics services, which automatically sets up the SageMaker Lakehouse integration so that the tables bucket is shown as a child catalog under the federated s3tablescatalog in the AWS Glue Data Catalog and is registered with AWS Lake Formation for access control. Next, you create a table namespace or database which is a logical construct that you group tables under and create a table using Athena SQL CREATE TABLE statement.
  2. Publish your data warehouse to Glue Data Catalog. Churn data is managed in a Redshift data warehouse, which is published to the Data Catalog as a federated catalog and is available in SageMaker Lakehouse.
  3. Create a SageMaker Unified Studio project. SageMaker Unified Studio integrates with SageMaker Lakehouse and simplifies analytics and AI with a unified experience. Start by creating a domain and adding all users (Bob, Charlie, Doug). Then create a project in the domain, choosing project profile that provisions various resources and the project AWS Identity and Access Management (IAM) role that manages resource access. Alice adds Bob, Charlie, and Doug to the project as members.
  4. Onboard S3 Tables and Redshift tables to SageMaker Unified Studio. To onboard the S3 Tables to the project, in Lake Formation, you grant permission on the resource to the SageMaker Unified Studio project role. This enables the catalog to be discoverable within the lakehouse data explorer for users (Bob, Charlie, and Doug) to start querying tables .SageMaker Lakehouse resources can now be accessed from computes like Athena, Redshift, and Apache Spark based computes like Glue to derive churn analysis insights, with Lake Formation managing the data permissions.

Prerequisites

To follow the steps in this post, you must complete the following prerequisites:

Alice completes the following steps to create the S3 Table bucket for the new data she plans to add/import into an S3 Table.

  1. AWS account with access to the following AWS services:
    • Amazon S3 including S3 Tables
    • Amazon Redshift
    • AWS Identity and Access Management (IAM)
    • Amazon SageMaker Unified Studio
    • AWS Lake Formation and AWS Glue Data Catalog
    • AWS Glue
  2. Create a user with administrative access.
  3. Have access to an IAM role that is a Lake Formation data lake administrator. For instructions, refer to Create a data lake administrator.
  4. Enable AWS IAM Identity Center in the same AWS Region where you want to create your SageMaker Unified Studio domain. Set up your identity provider (IdP) and synchronize identities and groups with AWS IAM Identity Center. For more information, refer to IAM Identity Center Identity source tutorials.
  5. Create a read-only administrator role to discover the Amazon Redshift federated catalogs in the Data Catalog. For instructions, refer to Prerequisites for managing Amazon Redshift namespaces in the AWS Glue Data Catalog.
  6. Create an IAM role named DataTransferRole. For instructions, refer to Prerequisites for managing Amazon Redshift namespaces in the AWS Glue Data Catalog.
  7. Create an Amazon Redshift Serverless namespace called churnwg. For more information, see Get started with Amazon Redshift Serverless data warehouses.

Create a table bucket and enable integration with analytics services

Alice completes the following steps to create the S3 Table bucket for the new data she plans to add/import into an S3 Tables.

Follow the below steps to create a table bucket to enable integration with SageMaker Lakehouse:

  1. Sign in to the S3 console as user created in prerequisite step 2.
  2. Choose Table buckets in the navigation pane and choose Enable integration.
  3. Choose Table buckets in the navigation pane and choose Create table bucket.
  4. For Table bucket name, enter a name such as blog-customer-bucket.
  5. Choose Create table bucket.
  6. Choose Create table with Athena.
  7. Select Create a namespace and provide a namespace (for example, customernamespace).
  8. Choose Create namespace.
  9. Choose Create table with Athena.
  10. On the Athena console, run the following SQL script to create a table:
    CREATE TABLE customer (
      `c_salutation` string, 
      `c_preferred_cust_flag` string, 
      `c_first_sales_date_sk` int, 
      `c_customer_sk` int, 
      `c_login` string, 
      `c_current_cdemo_sk` int, 
      `c_first_name` string, 
      `c_current_hdemo_sk` int, 
      `c_current_addr_sk` int, 
      `c_last_name` string, 
      `c_customer_id` string, 
      `c_last_review_date_sk` int, 
      `c_birth_month` int, 
      `c_birth_country` string, 
      `c_birth_year` int, 
      `c_birth_day` int, 
      `c_first_shipto_date_sk` int, 
      `c_email_address` string)
      TBLPROPERTIES ('table_type' = 'iceberg')
      
    
    INSERT INTO customer VALUES
    ('Dr.','N',2452077,13251813,'Y',1381546,'Joyce',2645,2255449,'Deaton','AAAAAAAAFOEDKMAA',2452543,1,'GREECE',1987,29,2250667,'[email protected]'),
    ('Dr.','N',2450637,12755125,'Y',1581546,'Daniel',9745,4922716,'Dow','AAAAAAAAFLAKCMAA',2432545,1,'INDIA',1952,3,2450667,'[email protected]'),
    ('Dr.','N',2452342,26009249,'Y',1581536,'Marie',8734,1331639,'Lange','AAAAAAAABKONMIBA',2455549,1,'CANADA',1934,5,2472372,'[email protected]'),
    ('Dr.','N',2452342,3270685,'Y',1827661,'Wesley',1548,11108235,'Harris','AAAAAAAANBIOBDAA',2452548,1,'ROME',1986,13,2450667,'[email protected]'),
    ('Dr.','N',2452342,29033279,'Y',1581536,'Alexandar',8262,8059919,'Salyer','AAAAAAAAPDDALLBA',2952543,1,'SWISS',1980,6,2650667,'[email protected]'),
    ('Miss','N',2452342,6520539,'Y',3581536,'Jerry',1874,36370,'Tracy','AAAAAAAALNOHDGAA',2452385,1,'ITALY',1957,8,2450667,'[email protected]')

This is just an example of adding a few rows to the table, but generally for production use cases, customers use engines such as Spark to add data to the table.

S3 Tables customer is now created, populated with data and integrated with SageMaker Lakehouse.

Set up Redshift tables and publish to the Data Catalog

Alice completes the following steps to connect the data in Redshift to be published into the data catalog. We’ll also demonstrate how the Redshift table is created and populated, but in Alice’s case Redshift table already exists with all the historic data on sales revenue.

  1. Sign in to the Redshift endpoint churnwg as an admin user.
  2. Run the following script to create a table under the dev database under the public schema:
    CREATE TABLE customer_churn (
    customer_id BIGINT,
    tenure INT,
    monthly_charges DECIMAL(5,1),
    total_charges DECIMAL(5,1),
    contract_type VARCHAR(100),
    payment_method VARCHAR(100),
    internet_service VARCHAR(100),
    has_phone_service BOOLEAN,
    is_churned BOOLEAN
    );
    
    INSERT INTO customer_churn VALUES
    (10251783, 12, 70.5, 850.0, 'Month-to-Month', 'Credit Card', 'Fiber Optic', true, true),
    (13251813, 36, 55.0, 1980.0, 'One Year', 'Bank Transfer', 'DSL', true, false),
    (12755125, 6, 90.0, 540.0, 'Month-to-Month', 'Mailed Check', 'Fiber Optic', false, true),
    (26009249, 12, 70.5, 850.0, 'One Year', 'Credit Card', 'DSL', true, false),
    (3270685, 36, 55.0, 1980.0, 'One Year', 'Bank Transfer', 'DSL', true, false),
    (29033279, 6, 90.0, 540.0, 'Month-to-Month', 'Mailed Check', 'Fiber Optic', false, true),
    (6520539, 24, 60.0, 1440.0, 'Two Year', 'Electronic Check', 'DSL', true, false);

    This is just an example of adding a few rows to the table, but generally for production use cases, customers use several ways to add data to the table as documented in Loading data in Amazon Redshift.

  3. On the Redshift Serverless console, navigate to the namespace.
  4. On the Action dropdown menu, choose Register with AWS Glue Data Catalog to integrate with SageMaker Lakehouse.
  5. Choose Register.
  6. Sign in to the Lake Formation console as the data lake administrator.
  7. Under Data Catalog in the navigation pane, choose Catalogs and Pending catalog invitations.
  8. Select the pending invitation and choose Approve and create catalog.
  9. Provide a name for the catalog (for example, churn_lakehouse).
  10. Under Access from engines, select Access this catalog from Iceberg-compatible engines and choose DataTransferRole for the IAM role.
  11. Choose Next.
  12. Choose Add permissions.
  13. Under Principals, choose the datalakeadmin role for IAM users and roles, Super user for Catalog permissions, and choose Add.
  14. Choose Create catalog.

Redshift Table customer_churn is now created, populated with data and integrated with SageMaker Lakehouse.

Create a SageMaker Unified Studio domain and project

Alice now sets up SageMaker Unified Studio domain and projects so that she can bring users (Bob, Charlie and Doug) together in the new project.

Complete the following steps to create a SageMaker domain and project using SageMaker Unified Studio:

  1. On the SageMaker Unified Studio console, create a SageMaker Unified Studio domain and project using the All Capabilities profile template. For more details, refer to Setting up Amazon SageMaker Unified Studio. For this post, we create a project named churn_analysis.
  2. Setup AWS Identity center with users Bob, Charlie and Doug, Add them to domain and project.
  3. From SageMaker Unified Studio, navigate to the project overview and on the Project details tab, note the project role Amazon Resource Name (ARN).
  4. Sign in to the IAM console as an admin user.
  5. In the navigation pane, choose Roles.
  6. Search for the project role and add AmazonS3TablesReadOnlyAccess by choosing Add permissions.

SageMaker Unified Studio is now setup with domain, project and users.

Onboard S3 Tables and Redshift tables to the SageMaker Unified Studio project

Alice now configures SageMaker Unified Studio project role for fine-grained access control to determine who on her team gets to access what data sets.

Grant the project role full table access on customer dataset. For that, complete the following steps:

  1. Sign in to the Lake Formation console as the data lake administrator.
  2. In the navigation pane, choose Data lake permissions, then choose Grant.
  3. In the Principals section, for IAM users and roles, choose the project role ARN noted earlier.
  4. In the LF-Tags or catalog resources section, select Named Data Catalog resources:
    • Choose <account_id>:s3tablescatalog/blog-customer-bucket for Catalogs.
    • Choose customernamespace for Databases.
    • Choose customer for Tables.
  5. In the Table permissions section, select Select and Describe for permissions.
  6. Choose Grant.

Now grant the project role access to subset of columns  from customer_churn dataset.

  1. In the navigation pane, choose Data lake permissions, then choose Grant.
  2. In the Principals section, for IAM users and roles, choose the project role ARN noted earlier.
  3. In the LF-Tags or catalog resources section, select Named Data Catalog resources:
    • Choose <account_id>:churn_lakehouse/dev for Catalogs.
    • Choose public for Databases.
    • Choose customer_churn for Tables.
  4. In the Table Permissions section, select Select.
  5. In the Data Permissions section, select Column-based access.
  6. For Choose permission filter, select Include columns and choose customer_id, internet_service, and is_churned.
  7. Choose Grant.

All users in the project churn_analysis in SageMaker Unified Studio are now setup. They have access to all columns in the table and fine-grained access permissions for Redshift table where they have access to only three columns.

Verify data access in SageMaker Unified Studio

Alice can now do a final verification if the data is all available to ensure that each of her team members are set up to access the datasets.

Now you can verify data access for different users in SageMaker Unified Studio.

  1. Sign in to SageMaker Unified Studio as Bob and choose the churn_analysis
  2. Navigate to the Data explorer to view s3tablescatalog and churn_lakehouse under Lakehouse.

Data Analyst uses Athena for analyzing customer churn

Bob, the data analyst can now logs into to the SageMaker Unified Studio, chooses the churn_analysis project and navigates to the Build options and choose Query Editor under Data Analysis & Integration.

Bob chooses the connection as Athena (Lakehouse), the catalog as s3tablescatalog/blog-customer-bucket, and the database as customernamespace. And runs the following SQL to analyze the data for customer churn:

select * from "churn_lakehouse/dev"."public"."customer_churn" a, 
"s3tablescatalog/blog-customer-bucket"."customernamespace"."customer" b
where a.customer_id=b.c_customer_sk limit 10;

Bob can now join the data across S3 Tables and Redshift in Athena and now can proceed to build full SQL analytics capability to automate building customer growth and churn leadership daily reports.

BI Analyst uses Redshift engine for analyzing customer data

Charlie, the BI Analyst can now logs into the SageMaker Unified Studio and chooses the churn_analysis project. He navigates to the Build options and choose Query Editor under Data Analysis & Integration. He chooses the connection as Redshift (Lakehouse), Databases as dev, Schemas as public.

He then runs the follow SQL to perform his specific analysis.

select * from "dev@churn_lakehouse"."public"."customer_churn" a, 
"blog-customer-bucket@s3tablescatalog"."customernamespace"."customer" b
where a.customer_id=b.c_customer_sk limit 10;

Charlie can now further update the SQL query and use it to power QuickSight dashboards that can be shared with Sales team members.

Data engineer uses AWS Glue Spark engine to process customer data

Finally, Doug logs in to SageMaker Unified Studio as Doug and chooses the churn_analysis project to perform his analysis. He navigates to the Build options and choose JupyterLab under IDE & Applications. He downloads the churn_analysis.ipynb notebook and upload it into the explorer. He then runs the cells by selecting compute as project.spark.compatibility.

He runs the following SQL to analyze the data for customer churn:

Doug, now can use Spark SQL and start processing data from both S3 tables and Redshift tables and start  building forecasting models for customer growth and churn

Cleaning up

If you implemented the example and want to remove the resources, complete the following steps:

  1. Clean up S3 Tables resources:
    1. Delete the table.
    2. Delete the namespace in the table bucket.
    3. Delete the table bucket.
  2. Clean up the Redshift data resources:
    1. On the Lake Formation console, choose Catalogs in the navigation pane.
    2. Delete the churn_lakehouse catalog.
  3. Delete SageMaker project, IAM roles, Glue resources, Athena workgroup, S3 buckets created for domain.
  4. Delete SageMaker domain and VPC created for the setup.

Conclusion

In this post, we showed how you can use SageMaker Lakehouse to unify data across S3 Tables and Redshift data warehouses, which can help you build powerful analytics and AI/ML applications on a single copy of data. SageMaker Lakehouse gives you the flexibility to access and query your data in-place with Iceberg-compatible tools and engines. You can secure your data in the lakehouse by defining fine-grained permissions that are enforced across analytics and ML tools and engines.

For more information, refer to Tutorial: Getting started with S3 Tables, S3 Tables integration, and Connecting to the Data Catalog using AWS Glue Iceberg REST endpoint. We encourage you to try out the S3 Tables integration with SageMaker Lakehouse integration and share your feedback with us.


About the authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She works with the product team and customers to build robust features and solutions for their analytical data platform. She enjoys building data mesh solutions and sharing them with the community.

Aditya Kalyanakrishnan is a Senior Product Manager on the Amazon S3 team at AWS. He enjoys learning from customers about how they use Amazon S3 and helping them scale performance. Adi’s based in Seattle, and in his spare time enjoys hiking and occasionally brewing beer.

Enhancing Adobe Marketo Engage Data Analysis with AWS Glue Integration

Post Syndicated from Kenny Rajan original https://aws.amazon.com/blogs/big-data/enhancing-adobe-marketo-engage-data-analysis-with-aws-glue-integration/

Today’s digital-first, B2B landscape presents marketers with complex challenges as they navigate sophisticated buyer journeys involving diverse decision-making groups. Adobe Marketo Engage offers a comprehensive marketing hub for orchestrating cross-channel campaigns. Using AI-driven personalization, automation, and real-time analytics, it helps businesses acquire and retain customers throughout their buying journeys. Marketo Engage empowers B2B marketers to navigate modern complexities and successfully drive measurable business growth through multi-channel engagement, automated customer journeys, and sales-marketing collaboration.

To further enhance their B2B marketing capabilities, organizations are now looking to fully use their marketing data for more informed decision-making and strategy optimization. Recognizing the need to simplify the analytics pipeline, AWS introduced software as a service (SaaS) connectivity for Marketo Engage through AWS Glue, delivering insights faster to enable data-driven decisions. The agile, serverless nature of AWS Glue meets a range of data analytics needs while reducing costs. This powerful integration links the robust marketing automation features of Marketo Engage with AWS’s advanced analytics ecosystem. By seamlessly connecting the platforms, businesses can extract greater value from marketing data, gaining deeper insights into customer behavior and campaign performance. Together, AWS Glue and Marketo Engage unlock new possibilities for data-driven marketing:

  • Marketing-sales alignment – Helps automate the transfer of lead and opportunity data between Marketo Engage and CRM systems, making sure that sales and marketing teams are aligned and responsive to customer needs
  • Enhanced analytics – Connects Marketo Engage with business intelligence (BI) tools for data-driven campaign optimization, allowing marketers to extract meaningful insights and make informed decisions
  • Data integrity – Maintains consistent, high-quality data across all systems, providing reliability and accuracy in marketing and sales operations
  • Improved lead quality – Refines lead scoring processes by using the advanced analytics capabilities of AWS, resulting in better-qualified leads and improved sales conversions
  • Unified customer view – Provides comprehensive customer insights using enriched AWS datasets for Marketo Engage, offering a holistic understanding of customer interactions and behaviors

In this post, we show you how to use AWS Glue to extract data from Marketo Engage for data processing and enrichment on AWS for use in marketing analytics workflows.

Solution Overview

We explore a use case in which a company wants to run analysis for campaign leads in multiple countries. The resulting leads will be shared with the respective regional marketing representatives. The solution uses AWS Glue to extract data from Marketo Engage and save it in an Amazon Simple Storage Service (Amazon S3) bucket. The following diagram illustrates the solution architecture.

1_Scope-of-the-solutions_AWSGLue_Marketo_Engage_Flow

In the following sections, we walk through the high-level steps to implement the solution:

  1. Create AWS resources to connect to Marketo Engine and store data.
  2. Create an AWS Glue connection.
  3. Create an extract, transform, and load (ETL) job using AWS Glue Studio.
  4. Analyze the data.

Prerequisites

To set up the integration between Marketo Engage and AWS, the following components are required:

  • A Marketo Engage account – If you don’t already have one, create a Marketo Engage application and record the Munchkin ID, client ID, and client secret for the application. Refer to the Marketo Engage developer portal to set up the connection.
  • An AWS Glue database – This will serve as the data interaction interface on AWS. The database will expose the data residing in Amazon S3 as queryable AWS Glue tables. For this post, our database is called marketodb.

Create AWS resources to connect to Marketo Engage and store data

We use an AWS CloudFormation template to create an S3 bucket to store data, an AWS Secrets Manager secret for Marketo Engage that the AWS Glue connection needs, and an AWS Identity and Access Management (IAM) role to access the secret. Complete the following steps:

  1. Click Launch Stack below.
    Click here to launch the Cloud Fromation stack
  2. On the Specify stack details page, enter a name for the stack.
  3. Specify the Marketo client secret.
  4. Choose Next.
  5. On the Configure stack options page, choose Next.
  6. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  7. Choose Submit. Note: The stack takes about 2 minutes to deploy.
  8. After the stack is created, make a note of the S3AccessRoleARN You will need this to create the Marketo Engage connection.

Create an AWS Glue connection

Complete the following steps to create an AWS Glue connection:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. For Data sources, select Marketo.

2-Glue-Data-Source-page

  1. Enter the Adobe MUNCHKIN_ID.
  2. Choose the IAM role created in the previous section as the AWS Glue connection IAM service role.
  3. Provide the Adobe ClientId as the user-managed client application client ID.
  4. Provide the Secrets Manager secret you created earlier.
  5. Choose Next.
  6. Specify your preferred connection name.
  7. Choose Next.
  8. Review the settings, then choose Create connection.

Entering-Marketo-Connectiondetails-in-glue.jpg

Create an ETL job using AWS Glue Studio

Complete the following steps to create an ETL job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Create job.
  3. Choose Visual ETL.
  4. Add Marketo as a source node.
  5. Add Amazon S3 as the target node.
    Selecting Source Adobe Marketo Engage as Target and Amazon S3 as the target node in the AWS glue Visual Flow
  1. Choose the Marketo Engage data source node, and the editor will show a configuration pane on the right side of the diagram.
  2. In the Data source properties pane, provide the following information:
    1. For Name, enter a name (for example, Marketo).
    2. For Marketo connection, choose your Marketo Engage connection.
    3. For Entity Name, choose Leads as the entity to retrieve from Marketo Engage.
    4. For Fields, choose All Fields as the fields to retrieve from Marketo Engage.
    5. For Filter, enter gender=’Male’ to pull leads according to the campaign criteria. Note that in this blog post you’re using a synchronous mode in which the Marketo Adobe API limits require that the retrieved data set is less that 1000. See the AWS documentation to apply the criteria and mechanisms that support your campaigns.

You can observe the data preview pane reflecting the modifications you have made.

5-Entering-Marketo-Source-Properties-in-glue

  1. Choose the Amazon S3 target node to configure it.
  2. In the Data target properties pane, provide the following information:
    1. For Name, enter a name (for example, Amazon S3).
    2. For Node parents¸ choose Marketo.
    3. For Format, choose Parquet.
    4. For Compression Type¸ choose Snappy.
    5. For S3 Target Location, enter the path to the S3 bucket you created earlier, and optionally specify a prefix. This will inform the ETL job where to store the data retrieved from Marketo Engage.
    6. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
    7. For Database, choose your database in the AWS Glue Data Catalog.
    8. For Table name, enter a table name for the Data Catalog (for example, marketo_leads).

After you configure the source and target nodes, both nodes in the Visual ETL Editor should have a green check mark, indicating they are correctly configured.

  1. Specify the name for the job and save it.
  2. When the job is saved, choose Run to invoke the ETL job.
  3. After the job starts, go to the Runs tab and observe the run until completion.

Depending on the size of the data in your account object in Marketo Engage, the job will take a few minutes to complete. After a successful job run, a new table called marketo_leads is created and populated with data from Marketo Engage.

Analyze the data

After a successful run, you can now use Amazon Athena analyze the data from Marketo Engage with the data residing on AWS. If you’re using Athena for the first time, refer to Create a query output location for instructions to set up the query editor. Then run the following query:

SELECT country, COUNT(*) as count FROM marketo_leads GROUP BY country ORDER BY country;

The query will output the number of people within each country who can be contacted as targeting leads for campaigns, and you can enrich this output by adding other datasets in your data lake or data warehouse. You can expect to see an output like the following screenshot.

Executing Query to find campaign information using Marketo Engage data

Clean up

To avoid incurring charges, clean up the resources in your AWS account by completing the following steps:

  1. Delete the table created from the Data Catalog:
    1. On the AWS Glue console, navigate to the Data Catalog.
    2. Select the table and choose Delete.
  2. Delete the ETL job:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. From the list of jobs, select the job you created, and on the Actions menu, choose Delete.
  3. Delete the data connection:
    1. On the AWS Glue console, choose Data connections in the navigation pane.
    2. Select the Marketo Engage connection from the list of connectors, and on the Actions menu, choose Delete.
  4. Delete the CloudFormation stack:
    1. On the CloudFormation console, choose Stacks in the navigation pane.
    2. Select the stack you created for the S3 bucket and related resources and delete it.

Conclusion

The AWS Glue connector for Marketo Engage streamlines data integration, permitting seamless data synchronization between Marketo Engage and AWS services for a holistic view of customer information. This powerful integration enhances the capacity for advanced analytics, enabling marketers to glean precise and insightful learnings from their data; these insights can then be used to inform and refine marketing strategies, boosting campaign performance and driving better business outcomes

For more information on the AWS Glue connector for Marketo Engage and AWS Glue, refer to the relevant user guides and visit the AWS Glue website.


About the Authors

Kenny Rajan is a Principal Enterprise Architect at AWS specializing in integrating generative AI with enterprise systems like SAP and Adobe. He helps organizations modernize their digital experience platforms and supply chain and back-end systems through data and AI-powered cloud solutions. Outside of work, he contributes to technology education and charitable initiatives.

Author Rafael Profile PictureRafał Pawłaszek is a Senior Cloud Application Architect at AWS. Rafał supports customer transformation to the cloud and customer enablement in the cloud. Outside of work, he is interested in astronomy, astrophysics, and psychology, and loves spending time with family.

Author Basher Profile PictureBasheer Sheriff is a Senior Solutions Architect at AWS. He loves to help customers solve interesting problems using new technology. He is based in Melbourne, Australia, and likes to play sports such as football and cricket.

Author Kamen Profile PictureKamen Sharlandjiev is a Sr. Big Data Solutions Architect, Amazon MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest Amazon MWAA and AWS Glue features and news!

Introducing a new unified data connection experience with Amazon SageMaker Lakehouse unified data connectivity

Post Syndicated from Chiho Sugimoto original https://aws.amazon.com/blogs/big-data/introducing-a-new-unified-data-connection-experience-with-amazon-sagemaker-lakehouse-data-connectivity/

The need to integrate diverse data sources has grown exponentially, but there are several common challenges when integrating and analyzing data from multiple sources, services, and applications. First, you need to create and maintain independent connections to the same data source for different services. Second, the data connectivity experience is inconsistent across different services. For each service, you need to learn the supported authorization and authentication methods, data access APIs, and framework to onboard and test data sources. Third, some services require you to set up and manage compute resources used for federated connectivity, and capabilities like connection testing and data preview aren’t available in all services. This fragmented, repetitive, and error-prone experience for data connectivity is a significant obstacle to data integration, analysis, and machine learning (ML) initiatives.

To solve for these challenges, we launched Amazon SageMaker Lakehouse unified data connectivity. This feature offers the following capabilities and benefits:

  • With SageMaker Lakehouse unified data connectivity, you can set up a connection to a data source using a connection configuration template that is standardized for multiple services. Amazon SageMaker Unified Studio, AWS Glue, and Amazon Athena can share and reuse the same connection with proper permission configuration.
  • SageMaker Lakehouse unified data connectivity supports standard methods for data source connection authorization and authentications, such as basic authorization and OAuth2. This approach simplifies your data journey and helps you meet your security requirements.
  • The SageMaker Lakehouse data connection testing capability boosts your confidence in established connections. With the ability to browse metadata, you can understand the structure and schema of the data source, identify relevant tables and fields, and discover useful data assets you may not be aware of.
  • SageMaker Lakehouse unified data connectivity’s data preview capability helps you map source fields to target schemas, identify needed data transformation, and plan data standardization and normalization steps.
  • SageMaker Lakehouse unified data connectivity provides a set of APIs for you to use without the need to learn different APIs for various data sources, promoting coding efficiency and productivity.

With SageMaker Lakehouse unified data connectivity, you can confidently connect, explore, and unlock the full value of your data across AWS services and achieve your business objectives with agility.

This post demonstrates how SageMaker Lakehouse unified data connectivity helps your data integration workload by streamlining the establishment and management of connections for various data sources.

Solution overview

In this scenario, an e-commerce company sells products on their online platform. The product data is stored on Amazon Aurora PostgreSQL-Compatible Edition. Their existing business intelligence (BI) tool runs queries on Athena. Furthermore, they have a data pipeline to perform extract, transform, and load (ETL) jobs when moving data from the Aurora PostgreSQL database cluster to other data stores.

Now they have a new requirement to allow ad-hoc queries through SageMaker Unified Studio to enable data engineers, data analysts, sales representatives, and others to take advantage of its unified experience.

In the following sections, we demonstrate how to set up this connection and run queries using different AWS services.

Prerequisites

Before you begin, make sure you have the followings:

  • An AWS account.
  • A SageMaker Unified Studio domain.
  • An Aurora PostgreSQL database cluster.
  • A virtual private cloud (VPC) and private subnets required for SageMaker Unified Studio.
  • An Amazon Simple Storage Service (Amazon S3) bucket to store output from the AWS Glue ETL jobs. In the following steps, replace amzn-s3-demo-destination-bucket with the name of the S3 bucket.
  • An AWS Glue Data Catalog database. In the following steps, replace <your_database> with the name of your database.

Create an IAM role for the AWS Glue job

You can either create a new AWS Identity and Access Management (IAM) role or use an existing role that has permission to access the AWS Glue output bucket and AWS Secrets Manager.

If you want to create a new one, complete the following steps:

  1. On the IAM console, in the navigation pane, choose Roles.
  2. Choose Create role.
  3. For Trusted entity type, choose AWS service.
  4. For Service or use case, choose Glue.
  5. Choose Next.
  6. For Add permissions, choose AWSGlueServiceRole, then choose Next.
  7. For Role name, enter a role name (for this post, GlueJobRole-demo).
  8. Choose Create role.
  9. Choose the created IAM role.
  10. Under Permissions policies, choose Add permission and Create inline policy.
  11. For Policy editor, choose JSON, and enter the following policy:
    {
         "Version": "2012-10-17",
         "Statement": [
             {
                 "Effect": "Allow",
                 "Action": [
                     "s3:List*",
                     "s3:GetObject",
                     "s3:PutObject",
                     "s3:DeleteObject"
                 ],
                 "Resource": [
                     "arn:aws:s3:::amzn-s3-demo-destination-bucket/*",
                     "arn:aws:s3:::amzn-s3-demo-destination-bucket"
                 ]
             },
            {
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue"
                ],
                "Resource": [
                    "arn:aws:secretsmanager:<region>:<account-id>:secret:SageMakerUnifiedStudio-Glue-postgresql_source-*"
                ]
            }
         ]
     }

  12. Choose Next.
  13. For Policy name, enter a name for your policy.
  14. Choose Create policy.

Create a SageMaker Lakehouse data connection

Let’s get started with the unified data connection experience. The first step is to create a SageMaker Lakehouse data connection. Complete the following steps:

  1. Sign in to your SageMaker Unified Studio.
  2. Open your project.
  3. On your project, in the navigation pane, choose Data.
  4. Choose the plus sign.
  5. For Add data source, choose Add connection. Choose Next.
  6. Select PostgreSQL, and choose Next.
  7. For Name, enter postgresql_source.
  8. For Host, enter your host name of your Aurora PostgreSQL database cluster.
  9. For Port, enter your port number of your Aurora PostgreSQL database cluster (by default, it’s 5432).
  10. For Database, enter your database name.
  11. For Authentication, select Username and password.
  12. Enter your username and password.
  13. Choose Add data.

After the completion, it will create a new AWS Secrets Manager secret with a name like SageMakerUnifiedStudio-Glue-postgresql_source to securely store the specified username and password. It also creates a Glue connection with the same name postgresql_source.

Now you have a unified connection for Aurora PostgreSQL-Compatible.

Load data into the PostgreSQL database through the notebook

You will use a JupyterLab notebook on SageMaker Unified Studio to load sample data from an S3 bucket into a PostgreSQL database using Apache Spark.

  1. On the top left menu, choose Build, and under IDE & APPLICATIONS, choose JupyterLab.
  2. Choose Python 3 under Notebook.
  3. For the first cell, choose Local Python, python, enter following code, and run the cell:
    %%configure -f -n project.spark
    {
        "glue_version": "4.0"
    }

  4. For the second cell, choose PySpark, spark, enter following code, and run the cell:
    # Read sample data from S3 bucket
    df = spark.read.parquet("s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Apparel/")
    
    # Preview the data
    df.show()

The code snippet reads the sample data Parquet files from the specified S3 bucket location and stores the data in a Spark DataFrame named df. The df.show() command displays the first 20 rows of the DataFrame, allowing you to preview the sample data in a tabular format. Next, you will load this sample data into a PostgreSQL database.

  1. For the third cell, choose PySpark, spark, enter following code, and run the cell (replace <account-id> with your AWS account ID):
    import boto3
    import ast
    
    # replace you account ID before running this cell
    
    # Get secret
    secretsmanager_client = boto3.client('secretsmanager')
    get_secret_value_response = secretsmanager_client.get_secret_value(
        SecretId='SageMakerUnifiedStudio-Glue-postgresql_source' # replace the secret name if needed
    )
    secret = ast.literal_eval(get_secret_value_response["SecretString"])
    
    # Get connection
    glue_client = boto3.client('glue')
    glue_client_response = glue_client.get_connection(
        CatalogId='<account-id>',
        Name='postgresql_source' # replace the connection name if needed
    )
    connection_properties = glue_client_response["Connection"]["ConnectionProperties"]

  2. For the fourth cell, choose PySpark, spark, enter following code, and run the cell:
    # Load data into the DB
    jdbcurl = "jdbc:postgresql://{}:{}/{}".format(connection_properties["HOST"],connection_properties["PORT"],connection_properties["DATABASE"])
    df.write \
        .format("jdbc") \
        .option("url", jdbcurl) \
        .option("dbtable", "public.unified_connection_test") \
        .option("user", secret["username"]) \
        .option("password", secret["password"]) \
        .save()

Let’s see if you could successfully create the new table unified_connection_test. You can navigate to the project’s Data page to visually verify the existence of the newly created table.

  1. On the top left menu, choose your project name, and under CURRENT PROJECT, choose Data.

Within the Lakehouse section, expand the postgresql_source, then the public schema, and you should find the newly created unified_connection_test table listed there. Next, you will query the data in this table using SageMaker Unified Studio’s SQL query book feature.

Run queries on the connection through the query book using Athena

Now you can run queries using the connection you created. In this section, we demonstrate how to use the query book using Athena. Complete the following steps:

  1. In your project on SageMaker Unified Studio, choose the Lakehouse section, expand the postgresql_source, then the public
  2. On the options menu (three vertical dots) of the table unified_connection_test, choose Query with Athena.

This step will open a new SQL query book. The query statement select * from "postgresql_source"."public"."unified_connection_test" limit 10; is automatically filled.

  1. On the Actions menu, choose Save to Project.
  2. For Querybook title, enter the name of your SQL query book.
  3. Choose Save changes.

This will save the current SQL query book, and the status of the notebook will change from Draft to Saved. If you want to revert a draft notebook to its last published state, choose Revert to published version to roll back to the most recently published version. Now, let’s start running queries on your notebook.

  1. Choose Run all.

When a query finishes, results can be viewed in a few formats. The table view displays query results in a tabular format. You can download the results as JSON or CSV files using the download icon at the bottom of the output cell. Additionally, the notebook provides a chart view to visualize query results as graphs.

The sample data includes a column star_rating representing a 5-star rating for products. Let’s try a quick visualization to analyze the rating distribution.

  1. Choose Add SQL to add a new cell.
  2. Enter the following statement:
    SELECT count() as counts, star_rating FROM "postgresql_source"."public"."unified_connection_test"
    GROUP BY star_rating

  3. Choose the run icon of the cell, or you can press Ctrl+Enter or Cmd+Enter to run the query.

This will display the results in the output panel. Now you have learned how the connection works on SageMaker Unified Studio. Next, we show how you can use the connection on AWS Glue consoles.

Run Glue ETL jobs on the connection on the AWS Glue console

Next, we create an AWS Glue ETL job that reads table data from the PostgreSQL connection, converts data types, transforms the data into Parquet files, and outputs them to Amazon S3. It also creates a table in the Glue Data Catalog and add partitions so downstream data engineers can immediately use the table data. Complete the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. Under Create job, choose Visual ETL.
  3. At the top of the job, replace “Untitled job” with a name of your choice.
  4. On the Job Details tab, under Basic properties, specify the IAM role that the job will use (GlueJobRole-demo).
  5. For Glue version, choose Glue version 4.0
  6. Choose Save.
  7. On the Visual tab, choose the plus sign to open the Add nodes
  8. Search for postgresql and add PostgreSQL as Source.
  9. For JDBC source, choose JDBC connection details.
  10. For PostgreSQL connection, choose postgresql_source.
  11. For Table name, enter unified_connection_test
  1. As a child of this source, search in the Add nodes menu for timestamp and choose To Timestamp.
  2. For Column to convert, choose review_date.
  3. For Column type, choose iso.
  4. On the Visual tab, search in the Add nodes menu for s3 and add Amazon S3 as Target.
  5. For Format, choose Parquet.
  6. For Compression Type, choose Snappy.
  7. For S3 Target Location, enter your S3 output location (s3://amzn-s3-demo-destination-bucket).
  8. For Data Catalog update options, choose Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  9. For Database, enter your Data Catalog database (<your_database>).
  10. For Table name, enter connection_demo_tbl.
  11. Under Partition keys, choose Add a partition key, and choose review_year.
  12. Choose Save, then choose Run to run the job.

When the job is complete, it will output Parquet files to Amazon S3 and create a table named connection_demo_tbl in the Data Catalog. You have now learned that you can use the SageMaker Lakehouse data connection not only in SageMaker Unified Studio, but also directly in AWS Glue console without needing to create separate individual connections.

Clean up

Now to the final step, cleaning up the resources. Complete the following steps:

  1. Delete the connection.
  2. Delete the Glue job.
  3. Delete the AWS Glue output S3 buckets.
  4. Delete the IAM role AWSGlueServiceRole.
  5. Delete the Aurora PostgreSQL cluster.

Conclusion

This post demonstrated how the SageMaker Lakehouse unified data connectivity works end to end, and how you can use the unified connection across different services such as AWS Glue and Athena. This new capability can simplify your data journey.

To learn more, refer to Amazon SageMaker Unified Studio.


About the Authors

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Shubham Agrawal is a Software Development Engineer on the AWS Glue team. He has expertise in designing scalable, high-performance systems for handling large-scale, real-time data processing. Driven by a passion for solving complex engineering problems, he focuses on building seamless integration solutions that enable organizations to maximize the value of their data.

Joju Eruppanal is a Software Development Manager on the AWS Glue team. He strives to delight customers by helping his team build software. He loves exploring different cultures and cuisines.

Julie Zhao is a Senior Product Manager at AWS Glue. She joined AWS in 2021 and brings three years of startup experience leading products in IoT data platforms. Prior to startups, she spent over 10 years in networking with Cisco and Juniper across engineering and product. She is passionate about building products to solve customer problems.

Building end-to-end data lineage for one-time and complex queries using Amazon Athena, Amazon Redshift, Amazon Neptune and dbt

Post Syndicated from Nancy Wu original https://aws.amazon.com/blogs/big-data/building-end-to-end-data-lineage-for-one-time-and-complex-queries-using-amazon-athena-amazon-redshift-amazon-neptune-and-dbt/

One-time and complex queries are two common scenarios in enterprise data analytics. One-time queries are flexible and suitable for instant analysis and exploratory research. Complex queries, on the other hand, refer to large-scale data processing and in-depth analysis based on petabyte-level data warehouses in massive data scenarios. These complex queries typically involve data sources from multiple business systems, requiring multilevel nested SQL or associations with numerous tables for highly sophisticated analytical tasks.

However, combining the data lineage of these two query types presents several challenges:

  1. Diversity of data sources
  2. Varying query complexity
  3. Inconsistent granularity in lineage tracking
  4. Different real-time requirements
  5. Difficulties in cross-system integration

Moreover, maintaining the accuracy and completeness of lineage information while providing system performance and scalability are crucial considerations. Addressing these challenges requires a carefully designed architecture and advanced technical solutions.

Amazon Athena offers serverless, flexible SQL analytics for one-time queries, enabling direct querying of Amazon Simple Storage Service (Amazon S3) data for rapid, cost-effective instant analysis. Amazon Redshift, optimized for complex queries, provides high-performance columnar storage and massively parallel processing (MPP) architecture, supporting large-scale data processing and advanced SQL capabilities. Amazon Neptune, as a graph database, is ideal for data lineage analysis, offering efficient relationship traversal and complex graph algorithms to handle large-scale, intricate data lineage relationships. The combination of these three services provides a powerful, comprehensive solution for end-to-end data lineage analysis.

In the context of comprehensive data governance, Amazon DataZone offers organization-wide data lineage visualization using Amazon Web Services (AWS) services, while dbt provides project-level lineage through model analysis and supports cross-project integration between data lakes and warehouses.

In this post, we use dbt for data modeling on both Amazon Athena and Amazon Redshift. dbt on Athena supports real-time queries, while dbt on Amazon Redshift handles complex queries, unifying the development language and significantly reducing the technical learning curve. Using a single dbt modeling language not only simplifies the development process but also automatically generates consistent data lineage information. This approach offers robust adaptability, easily accommodating changes in data structures.

By integrating Amazon Neptune graph database to store and analyze complex lineage relationships, combined with AWS Step Functions and AWS Lambda functions, we achieve a fully automated data lineage generation process. This combination promotes consistency and completeness of lineage data while enhancing the efficiency and scalability of the entire process. The result is a powerful and flexible solution for end-to-end data lineage analysis.

Architecture overview

The experiment’s context involves a customer already using Amazon Athena for one-time queries. To better accommodate massive data processing and complex query scenarios, they aim to adopt a unified data modeling language across different data platforms. This led to the implementation of both Athena on dbt and Amazon Redshift on dbt architectures.

AWS Glue crawler crawls data lake information from Amazon S3, generating a Data Catalog to support dbt on Amazon Athena data modeling. For complex query scenarios, AWS Glue performs extract, transform, and load (ETL) processing, loading data into the petabyte-scale data warehouse, Amazon Redshift. Here, data modeling uses dbt on Amazon Redshift.

Lineage data original files from both parts are loaded into an S3 bucket, providing data support for end-to-end data lineage analysis.

The following image is the architecture diagram for the solution.

Figure 1-Architecture diagram of DBT modeling based on Athena and Redshift

Some important considerations:

This experiment uses the following data dictionary:

Source table Tool Target table
imdb.name_basics DBT/Athena stg_imdb__name_basics
imdb.title_akas DBT/Athena stg_imdb__title_akas
imdb.title_basics DBT/Athena stg_imdb__title_basics
imdb.title_crew DBT/Athena stg_imdb__title_crews
imdb.title_episode DBT/Athena stg_imdb__title_episodes
imdb.title_principals DBT/Athena stg_imdb__title_principals
imdb.title_ratings DBT/Athena stg_imdb__title_ratings
stg_imdb__name_basics DBT/Redshift new_stg_imdb__name_basics
stg_imdb__title_akas DBT/Redshift new_stg_imdb__title_akas
stg_imdb__title_basics DBT/Redshift new_stg_imdb__title_basics
stg_imdb__title_crews DBT/Redshift new_stg_imdb__title_crews
stg_imdb__title_episodes DBT/Redshift new_stg_imdb__title_episodes
stg_imdb__title_principals DBT/Redshift new_stg_imdb__title_principals
stg_imdb__title_ratings DBT/Redshift new_stg_imdb__title_ratings
new_stg_imdb__name_basics DBT/Redshift int_primary_profession_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift int_known_for_titles_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift names
new_stg_imdb__title_akas DBT/Redshift titles
new_stg_imdb__title_basics DBT/Redshift int_genres_flattened_from_title_basics
new_stg_imdb__title_basics DBT/Redshift titles
new_stg_imdb__title_crews DBT/Redshift int_directors_flattened_from_title_crews
new_stg_imdb__title_crews DBT/Redshift int_writers_flattened_from_title_crews
new_stg_imdb__title_episodes DBT/Redshift titles
new_stg_imdb__title_principals DBT/Redshift titles
new_stg_imdb__title_ratings DBT/Redshift titles
int_known_for_titles_flattened_from_name_basics DBT/Redshift titles
int_primary_profession_flattened_from_name_basics DBT/Redshift
int_directors_flattened_from_title_crews DBT/Redshift names
int_genres_flattened_from_title_basics DBT/Redshift genre_titles
int_writers_flattened_from_title_crews DBT/Redshift names
genre_titles DBT/Redshift
names DBT/Redshift
titles DBT/Redshift

The lineage data generated by dbt on Athena includes partial lineage diagrams, as exemplified in the following images. The first image shows the lineage of name_basics in dbt on Athena. The second image shows the lineage of title_crew in dbt on Athena.

Figure 3-Lineage of name_basics in DBT on Athena

Figure 4-Lineage of title_crew in DBT on Athena

The lineage data generated by dbt on Amazon Redshift includes partial lineage diagrams, as illustrated in the following image.

Figure 5-Lineage of name_basics and title_crew in DBT on Redshift

Referring to the data dictionary and screenshots, it’s evident that the complete data lineage information is highly dispersed, spread across 29 lineage diagrams. Understanding the end-to-end comprehensive view requires significant time. In real-world environments, the situation is often more complex, with complete data lineage potentially distributed across hundreds of files. Consequently, integrating a complete end-to-end data lineage diagram becomes crucial and challenging.

This experiment will provide a detailed introduction to processing and merging data lineage files stored in Amazon S3, as illustrated in the following diagram.

Figure 6-Merging data lineage from Athena and Redshift into Neptune

Prerequisites

To perform the solution, you need to have the following prerequisites in place:

  • The Lambda function for preprocessing lineage files must have permissions to access Amazon S3 and Amazon Redshift.
  • The Lambda function for constructing the directed acyclic graph (DAG) must have permissions to access Amazon S3 and Amazon Neptune.

Solution walkthrough

To perform the solution, follow the steps in the next sections.

Preprocess raw lineage data for DAG generation using Lambda functions

Use Lambda to preprocess the raw lineage data generated by dbt, converting it into key-value pair JSON files that are easily understood by Neptune: athena_dbt_lineage_map.json and redshift_dbt_lineage_map.json.

  1. To create a new Lambda function in the Lambda console, enter a Function name, select the Runtime (Python in this example), configure the Architecture and Execution role, then click the “Create function” button.

Figure 7-Basic configuration of athena-data-lineage-process Lambda

  1. Open the created Lambda function and on the Configuration tab, in the navigation pane, select Environment variables and choose your configurations. Using Athena on dbt processing as an example, configure the environment variables as follows (the process for Amazon Redshift on dbt is similar):
    • INPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path storing the original Athena on dbt lineage files)
    • INPUT_KEY: athena_manifest.json (the original Athena on dbt lineage file)
    • OUTPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path for storing the preprocessed output of Athena on dbt lineage files)
    • OUTPUT_KEY: athena_dbt_lineage_map.json (the output file after preprocessing the original Athena on dbt lineage file)

Figure 8-Environment variable configuration for athena-data-lineage-process-Lambda

  1. On the Code tab, in the lambda_function.py file, enter the preprocessing code for the raw lineage data. Here’s a code reference using Athena on dbt processing as an example (the process for Amazon Redshift on dbt is similar). The preprocessing code for Athena on dbt’s original lineage file is as follows:

The athena_manifest.json, redshift_manifest.json, and other files used in this experiment can be obtained from the Data Lineage Graph Construction GitHub repository.

import json
import boto3
import os

def lambda_handler(event, context):
    # Set up S3 client
    s3 = boto3.client('s3')

    # Get input and output paths from environment variables
    input_bucket = os.environ['INPUT_BUCKET']
    input_key = os.environ['INPUT_KEY']
    output_bucket = os.environ['OUTPUT_BUCKET']
    output_key = os.environ['OUTPUT_KEY']

    # Define helper function
    def dbt_nodename_format(node_name):
        return node_name.split(".")[-1]

    # Read input JSON file from S3
    response = s3.get_object(Bucket=input_bucket, Key=input_key)
    file_content = response['Body'].read().decode('utf-8')
    data = json.loads(file_content)
    lineage_map = data["child_map"]
    node_dict = {}
    dbt_lineage_map = {}

    # Process data
    for item in lineage_map:
        lineage_map[item] = [dbt_nodename_format(child) for child in lineage_map[item]]
        node_dict[item] = dbt_nodename_format(item)

    # Update key names
    lineage_map = {node_dict[old]: value for old, value in lineage_map.items()}
    dbt_lineage_map["lineage_map"] = lineage_map

    # Convert result to JSON string
    result_json = json.dumps(dbt_lineage_map)

    # Write JSON string to S3
    s3.put_object(Body=result_json, Bucket=output_bucket, Key=output_key)
    print(f"Data written to s3://{output_bucket}/{output_key}")

    return {
        'statusCode': 200,
        'body': json.dumps('Athena data lineage processing completed successfully')
    }

Merge preprocessed lineage data and write to Neptune using Lambda functions

  1. Before processing data with the Lambda function, create a Lambda layer by uploading the required Gremlin plugin. For detailed steps on creating and configuring Lambda Layers, see the AWS Lambda Layers documentation.

Because connecting Lambda to Neptune for constructing a DAG requires the Gremlin plugin, it needs to be uploaded before using Lambda. The Gremlin package can be obtained from the Data Lineage Graph Construction GitHub repository.

Figure 9-Lambda layers

  1. Create a new Lambda function. Choose the function to configure. To the recently created layer, at the bottom of the page, choose Add a layer.

Figure 10_Add a layer

Create another Lambda layer for the requests library, similar to how you created the layer for the Gremlin plugin. This library will be used for HTTP client functionality in the Lambda function.

  1. Choose the recently created Lambda function to configure. Connect to Neptune through Lambda to merge the two datasets and construct a DAG. On the Code tab, the reference code to execute is as follows:
import json
import boto3
import os
import requests
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import get_credentials
from botocore.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed

def read_s3_file(s3_client, bucket, key):
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = json.loads(response['Body'].read().decode('utf-8'))
        return data.get("lineage_map", {})
    except Exception as e:
        print(f"Error reading S3 file {bucket}/{key}: {str(e)}")
        raise

def merge_data(athena_data, redshift_data):
    return {**athena_data, **redshift_data}

def sign_request(request):
    credentials = get_credentials(Session())
    auth = SigV4Auth(credentials, 'neptune-db', os.environ['AWS_REGION'])
    auth.add_auth(request)
    return dict(request.headers)

def send_request(url, headers, data):
    try:
        response = requests.post(url, headers=headers, data=data, timeout=30)
        response.raise_for_status()
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Request Error: {str(e)}")
        if hasattr(e.response, 'text'):
            print(f"Response content: {e.response.text}")
        raise

def write_to_neptune(data):
    endpoint = 'https://your neptune endpoint name:8182/gremlin'
    # replace with your neptune endpoint name

    # Clear Neptune database
    clear_query = "g.V().drop()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': clear_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': clear_query}))
    print(f"Clear database response: {response}")

    # Verify if the database is empty
    verify_query = "g.V().count()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': verify_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': verify_query}))
    print(f"Vertex count after clearing: {response}")
    
    def process_node(node, children):
        # Add node
        query = f"g.V().has('lineage_node', 'node_name', '{node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{node}'))"
        request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
        signed_headers = sign_request(request)
        response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
        print(f"Add node response for {node}: {response}")

        for child_node in children:
            # Add child node
            query = f"g.V().has('lineage_node', 'node_name', '{child_node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{child_node}'))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add child node response for {child_node}: {response}")

            # Add edge
            query = f"g.V().has('lineage_node', 'node_name', '{node}').as('a').V().has('lineage_node', 'node_name', '{child_node}').coalesce(inE('lineage_edge').where(outV().as('a')), addE('lineage_edge').from('a').property('edge_name', ' '))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add edge response for {node} -> {child_node}: {response}")

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_node, node, children) for node, children in data.items()]
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error in processing node: {str(e)}")

def lambda_handler(event, context):
    # Initialize S3 client
    s3_client = boto3.client('s3')

    # S3 bucket and file paths
    bucket_name = 'data-lineage-analysis' # Replace with your S3 bucket name
    athena_key = 'athena_dbt_lineage_map.json' # Replace with your athena lineage key value output json name
    redshift_key = 'redshift_dbt_lineage_map.json' # Replace with your redshift lineage key value output json name

    try:
        # Read Athena lineage data
        athena_data = read_s3_file(s3_client, bucket_name, athena_key)
        print(f"Athena data size: {len(athena_data)}")

        # Read Redshift lineage data
        redshift_data = read_s3_file(s3_client, bucket_name, redshift_key)
        print(f"Redshift data size: {len(redshift_data)}")

        # Merge data
        combined_data = merge_data(athena_data, redshift_data)
        print(f"Combined data size: {len(combined_data)}")

        # Write to Neptune (including clearing the database)
        write_to_neptune(combined_data)

        return {
            'statusCode': 200,
            'body': json.dumps('Data successfully written to Neptune')
        }
    except Exception as e:
        print(f"Error in lambda_handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

Create Step Functions workflow

  1. On the Step Functions console, choose State machines, and then choose Create state machine. On the Choose a template page, select Blank template.

Figure 11-Step Functions blank template

  1. In the Blank template, choose Code to define your state machine. Use the following example code:
{
  "Comment": "Daily Data Lineage Processing Workflow",
  "StartAt": "Parallel Processing",
  "States": {
    "Parallel Processing": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Process Athena Data",
          "States": {
            "Process Athena Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "athena-data-lineange-process-Lambda", ##Replace with your Athena data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Process Redshift Data",
          "States": {
            "Process Redshift Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "redshift-data-lineange-process-Lambda", ##Replace with your Redshift data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "Load Data to Neptune"
    },
    "Load Data to Neptune": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "data-lineage-analysis-lambda" ##Replace with your Lambda function Name
      },
      "End": true
    }
  }
}
  1. After completing the configuration, choose the Design tab to view the workflow shown in the following diagram.

Figure 12-Step Functions design view

Create scheduling rules with Amazon EventBridge

Configure Amazon EventBridge to generate lineage data daily during off-peak business hours. To do this:

  1. Create a new rule in the EventBridge console with a descriptive name.
  2. Set the rule type to “Schedule” and configure it to run once daily (using either a fixed rate or the Cron expression “0 0 * * ? *”).
  3. Select the AWS Step Functions state machine as the target and specify the state machine you created earlier.

Query results in Neptune

  1. On the Neptune console, select Notebooks. Open an existing notebook or create a new one.

Figure 13-Neptune notebook

  1. In the notebook, create a new code cell to perform a query. The following code example shows the query statement and its results:
%%gremlin -d node_name -de edge_name
g.V().hasLabel('lineage_node').outE('lineage_edge').inV().hasLabel('lineage_node').path().by(elementMap())

You can now see the end-to-end data lineage graph information for both dbt on Athena and dbt on Amazon Redshift. The following image shows the merged DAG data lineage graph in Neptune.

Figure 14-Merged DAG data lineage graph in Neptune

You can query the generated data lineage graph for data related to a specific table, such as title_crew.

The sample query statement and its results are shown in the following code example:

%%gremlin -d node_name -de edge_name
g.V().has('lineage_node', 'node_name', 'title_crew')
  .repeat(
    union(
      __.inE('lineage_edge').outV(),
      __.outE('lineage_edge').inV()
    )
  )
  .until(
    __.has('node_name', within('names', 'genre_titles', 'titles'))
    .or()
    .loops().is(gt(10))
  )
  .path()
  .by(elementMap())

The following image shows the filtered results based on title_crew table in Neptune.

Figure 15-Filtered results based on title_crew table in Neptune

Clean up

To clean up your resources, complete the following steps:

  1. Delete EventBridge rules
# Stop new events from triggering while removing dependencies
aws events disable-rule --name <rule-name>
# Break connections between rule and targets (like Lambda functions)
aws events remove-targets --rule <rule-name> --ids <target-id>
# Remove the rule completely from EventBridge
aws events delete-rule --name <rule-name>
  1. Delete Step Functions state machine
# Stop all running executions
aws stepfunctions stop-execution --execution-arn <execution-arn>
# Delete the state machine
aws stepfunctions delete-state-machine --state-machine-arn <state-machine-arn>
  1. Delete Lambda functions
# Delete Lambda function
aws lambda delete-function --function-name <function-name>
# Delete Lambda layers (if used)
aws lambda delete-layer-version --layer-name <layer-name> --version-number <version>
  1. Clean up the Neptune database
# Delete all snapshots
aws neptune delete-db-cluster-snapshot --db-cluster-snapshot-identifier <snapshot-id>
# Delete database instance
aws neptune delete-db-instance --db-instance-identifier <instance-id> --skip-final-snapshot
# Delete database cluster
aws neptune delete-db-cluster --db-cluster-identifier <cluster-id> --skip-final-snapshot
  1. Follow the instructions at Deleting a single object to clean up the S3 buckets

Conclusion

In this post, we demonstrated how dbt enables unified data modeling across Amazon Athena and Amazon Redshift, integrating data lineage from both one-time and complex queries. By using Amazon Neptune, this solution provides comprehensive end-to-end lineage analysis. The architecture uses AWS serverless computing and managed services, including Step Functions, Lambda, and EventBridge, providing a highly flexible and scalable design.

This approach significantly lowers the learning curve through a unified data modeling method while enhancing development efficiency. The end-to-end data lineage graph visualization and analysis not only strengthen data governance capabilities but also offer deep insights for decision-making.

The solution’s flexible and scalable architecture effectively optimizes operational costs and improves business responsiveness. This comprehensive approach balances technical innovation, data governance, operational efficiency, and cost-effectiveness, thus supporting long-term business growth with the adaptability to meet evolving enterprise needs.

With OpenLineage-compatible data lineage now generally available in Amazon DataZone, we plan to explore integration possibilities to further enhance the system’s capability to handle complex data lineage analysis scenarios.

If you have any questions, please feel free to leave a comment in the comments section.


About the authors

nancynwu+photo

Nancy Wu is a Solutions Architect at AWS, responsible for cloud computing architecture consulting and design for multinational enterprise customers. Has many years of experience in big data, enterprise digital transformation research and development, consulting, and project management across telecommunications, entertainment, and financial industries.

Xu+Feng+PhotoXu Feng is a Senior Industry Solution Architect at AWS, responsible for designing, building, and promoting industry solutions for the Media & Entertainment and Advertising sectors, such as intelligent customer service and business intelligence. With 20 years of software industry experience, currently focused on researching and implementing generative AI and AI-powered data solutions.

Xu+Da+PhotoXu Da is a Amazon Web Services (AWS) Partner Solutions Architect based out of Shanghai, China. He has more than 25 years of experience in IT industry, software development and solution architecture. He is passionate about collaborative learning, knowledge sharing, and guiding community in their cloud technologies journey.

Catalog and govern Amazon Athena federated queries with Amazon SageMaker Lakehouse

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/catalog-and-govern-amazon-athena-federated-queries-with-amazon-sagemaker-lakehouse/

Yesterday, we announced Amazon SageMaker Unified Studio (Preview), an integrated experience for all your data and AI and Amazon SageMaker Lakehouse to unify data – from Amazon Simple Storage Service (S3) to third-party sources such as Snowflake. We’re excited by how SageMaker Lakehouse helps break down data silos, but we also know customers don’t want to compromise on data governance or introduce security and compliance risks as they expand data access.

With this new capability, data analysts can now securely access and query data stored outside S3 data lakes, including Amazon Redshift data warehouses and Amazon DynamoDB databases, all through a single, unified experience. Administrators can now apply access controls at different levels of granularity to ensure sensitive data remains protected while expanding data access. This allows organizations to accelerate data initiatives while maintaining security and compliance, leading to faster, data-driven decision-making.

In this post, we show how to connect to, govern, and run federated queries on data stored in Redshift, DynamoDB (Preview), and Snowflake (Preview). To query our data, we use Athena, which is seamlessly integrated with SageMaker Unified Studio. We use SageMaker Lakehouse to present data to end-users as federated catalogs, a new type of catalog object. Finally, we demonstrate how to use column-level security permissions in AWS Lake Formation to give analysts access to the data they need while restricting access to sensitive information.

Background

As data volumes grow, organizations often employ specialized storage systems to achieve optimal performance and cost-efficiency with different use cases. However, this approach can result in data silos, and makes it challenging to gain insights from data for several reasons. First, end-users often have to set up connections to data sources on their own. This is challenging because of configuration details that vary by source and technical connectivity properties they may not have access to. Second, data sources often have their own built-in access controls, which fragments data governance. Lastly, copying data from one storage system to another for the purposes of analysis adds cost and creates duplication risks.

SageMaker Lakehouse streamlines connecting to, cataloging, and managing permissions on data from multiple sources. It integrates with SageMaker Unified Studio, Athena, and other popular tools to give flexibility to end-users to work with data from their preferred tools.

As you create connections to data, SageMaker Lakehouse creates the underlying catalogs, databases, and tables, and integrates these resources with Lake Formation. Administrators can then define and centrally manage fine-grained access controls on these resources, without having to learn different access management concepts for each data source.

With the right access permissions in place, data discovery and analytics workflows are streamlined. Data analysts no longer need to connect to data sources on their own, saving time and frustration from setting up connectors with configurations that vary by source. Instead, analysts can simply run SQL queries on federated data catalogs, seamlessly accessing diverse data for various needs, which accelerates insights and enhances productivity.

Solution overview

This post presents a solution where a company is using multiple data sources containing customer data. Analysts want to query this data for analytics and AI and machine learning (ML) workloads. However, regulations require personally identifiable information (PII) data to be secured. The following diagram illustrates the solution architecture.

In our use case, an administrator is responsible for data governance and has administrator-level access to data sources – including Redshift, DynamoDB, and Snowflake. Existing regulations require administrators to safeguard sensitive PII data, such as customer mobile phone number, which is stored in multiple places. At the same time, there are business stakeholders in data analyst job functions who need access to these databases because they contain valuable business data that they need access to in order to gain insight on business health.

We will use an administrator account to create connections to Redshift, DynamoDB, and Snowflake, register these as catalogs in SageMaker Lakehouse, and then set up fine-grained access controls using Lake Formation. When complete, we use a data analyst account to query the data with Athena but we will be unable to access the data the role is not entitled to.

Prerequisites

Make sure you have the following prerequisites:

  • An AWS account with permission to create IAM roles and IAM policies
  • An AWS Identity and Access Management (IAM) user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI)
  • Administrator access to SageMaker Lakehouse and the following roles:
  • A SageMaker Unified Studio domain and two projects using the SQL Analytics profile. To learn more, refer to the Amazon SageMaker Unified Studio Administrator Guide.
    • An Admin project will be used to create connections
    • A Data Analyst project will be used to analyze data and will include both administrator and analysts as members. Take note of the IAM role in the Data Analyst project from the Project Overview page. This IAM role will be referenced when granting access later on.
  • Administrator access to one or more of the following data sources, and data sources set up as shown in the appendix A and B:
    • Redshift
    • DynamoDB
    • Snowflake

Set up federated catalogs

The first step is to set up federated catalogs for our data sources using an administrator account. The section below walks you through the end-to-end process with DynamoDB and demonstrates how to query the data when setup is complete. When you are done setting up and exploring the DynamoDB data, repeat these steps for Redshift and Snowflake.

  1. On the SageMaker Unified Studio console, open your project.
  2. Choose Data in the navigation pane.
  3. In the data explorer, choose the plus icon to add a data source.
  4. Under Add a data source, choose Add connection, then choose Amazon DynamoDB.
  5. Enter your connection details, and choose Add data source.

Next, SageMaker Unified Studio connects to your data source, registers the data source as a federated catalog with SageMaker Lakehouse, and displays it in your data explorer.

To explore and query your data, click any SageMaker Lakehouse catalog to view its contents. Use the data explorer to drill down to a table and use the Actions menu to select Query with Athena.

This brings you to the query editor where your sample query is executed. Here, try different SQL statements to better understand your data and to gain familiarity with query development features in SageMaker Unified Studio. To learn more, see SQL analytics in the Amazon SageMaker Unified Studio User Guide.

Similarly, you can setup data source connection for Redshift and Snowflake and query the data. Please refer to Appendix B which contains screenshots capturing the details needed to create the connection and data catalog for Redshift and Snowflake sources.

Set up fine-grained access permissions on federated catalogs

Our next step is to set up access permissions on our federated catalogs. As mentioned in the prerequisites, you have already set up an IAM role with data analyst permissions and a SageMaker Studio data analyst project. We will grant permissions to the data analyst role and SageMaker studio data analyst project role to ensure that access controls you specify are enforced when the data is queried. The following steps show how to set up permissions on a Redshift federated catalog, but the steps are the same for each data source.

  1. Navigate to Lake Formation in the AWS management console as an administrator.
  2. In the Lake Formation console, under Data Catalog in the navigation pane, choose Catalogs. Here, you will see the federated catalogs that were set up previously in SageMaker Unified Studio.
  3. Choose the federated catalog that you wish to set up permissions for. Here, you can see details for the catalog and any associated databases and tables, and manage permissions.
  4. From the Actions menu, choose Grant to grant permissions to the data analyst role and SageMaker studio data analyst project role.
  5. In Catalogs, choose the federated catalog name for the source you wish to grant permissions on.
  6. In Databases, choose your Redshift schema, Snowflake schema, or default for DynamoDB.
  7. In Database permissions, select Describe.
  8. Choose Grant.

The next step is to grant the permission on the tables to the data analyst role and SageMaker studio data analyst project role. For this solution, assume you wish to restrict access to a sensitive column containing the mobile phone number for each customer.

  1. In the Actions menu, choose Grant.
  2. In Catalogs, choose your federated catalog.
  3. In Databases, choose your Redshift schema, Snowflake schema, or default for DynamoDB.
  4. In Tables, choose your tables.
  5. In Table permissions, choose Select.
  6. In Data permissions, choose Column-based access.
  7. In Choose permission filter, choose Include columns.
  8. In Select columns, choose one or more columns.
  9. Choose Grant.

You have successfully set up fine-grained access permissions on your Redshift federated catalog. Repeat these steps to add permissions on your DynamoDB and Snowflake federated catalogs.

Validate fine-grained access permissions on federated catalogs

Now that you have set up federated catalogs with fine-grained access permissions, it’s time to run queries to confirm access permissions are working as expected.

First, access SageMaker Unified Studio using the data analyst role and navigate to your project, select Query Editor from the Build menu, and click on the DynamoDB catalog in the Data explorer. Next, drill down to a table and click Query with Athena to run a sample query. Note how permissions are working as expected because the query result does not include the mobile phone number column that was visible before.

Next, query the Redshift data source and note how the mobile phone number is not included in the query result.

Lastly, query the Snowflake data source and, like the previous examples, note how the result does not include the mobile phone number column.

In this example, we demonstrated how to set up a basic column-level filter to restrict access to sensitive data. However, SageMaker Lakehouse supports a broad range of fine-grained access control scenarios beyond column filters that allow you to meet complex security and compliance requirements across diverse data sources. To learn more, see Managing Permissions.

Clean up

Make sure you remove the SageMaker Lakehouse resources to mitigate any unexpected costs. Start by deleting the connections, catalogs, underlying data sources, projects, and domain that you created for this blog. For additional details, refer to the Amazon SageMaker Unified Studio Administrator Guide.

Conclusion

In this blog post, we utilized fine-grained access controls with federated queries in Athena. We demonstrated how this feature allows flexibility in choosing the right data storage solutions for your needs while securely expanding access to data. We showed how to create federated catalogs and set up access policies with Lake Formation, and then queried data with Athena where we saw permissions enforced on different sources. This approach unified data access controls and streamlined data discovery, saving end-users valuable time. To learn more about federated queries in Athena and the data sources that support fine-grained access controls today, see Register your connection as a Glue Data Catalog in the Athena User Guide.

We encourage you to try fine-grained access controls on federated queries today in SageMaker Unified Studio, and to share your feedback with us. To learn more, see Getting started in the Amazon SageMaker Unified Studio User Guide.


Appendix A: Set up data sources

In this section, we provide the steps to set up your data sources.

Redshift

You can create a new table customer_rs in your current database with columns cust_id, mobile, and zipcode and populate with sample data using the following SQL command:

CREATE TABLE "customer_rs" AS
SELECT 6 AS "cust_id",  66666666 AS "mobile", 6000 as "zipcode"
UNION ALL SELECT 7, 77777777, 7000
UNION ALL SELECT 8,  88888888, 8000
UNION ALL SELECT 9,  99999999, 9000
UNION ALL SELECT 10, 11112222, 1100

DynamoDB

You can create a new table in DynamoDB with the partition key cust_id and the sort key zipcode through AWS CloudShell with the following command:

aws dynamodb create-table \
    --table-name customer_ddb \
    --attribute-definitions \
        AttributeName=cust_id,AttributeType=N \
        AttributeName=zipcode,AttributeType=N \
    --key-schema \
        AttributeName=cust_id,KeyType=HASH \
        AttributeName=zipcode,KeyType=RANGE \
    --provisioned-throughput \
        ReadCapacityUnits=5,WriteCapacityUnits=5 \
    --table-class STANDARD

You can populate the DynamoDB table with the following commands:

aws dynamodb put-item \
    --table-name customer_ddb  \
    --item \
        ‘{“cust_id”: {“N”: “11”}, “zipcode”: {“N”: “2000”}, “mobile”: {“N”: “11113333”}}’

aws dynamodb put-item \
    --table-name customer_ddb  \
    --item \
              ‘{“cust_id”: {“N”: “12”}, “zipcode”: {“N”: “2000”}, “mobile”: {“N”: “22224444”}}’

aws dynamodb put-item \
    --table-name customer_ddb \
    --item \
               ‘{“cust_id”: {“N”: “13”}, “zipcode”: {“N”: “3000”}, “mobile”: {“N”: “33335555”}}’
                            
aws dynamodb put-item \
    --table-name customer_ddb \
    --item \
               ‘{“cust_id”: {“N”: “14”}, “zipcode”: {“N”: “4000”}, “mobile”: {“N”: “55556666”}}’

Snowflake

You can create your database, schema, and tables in Snowflake with the following SQL queries:

use database tasty_bytes_sample_data
create schema "sf_schema"

CREATE TABLE "customer_sf" AS
SELECT 1 AS "cust_id",  11111111 AS "mobile", 1000 as "zipcode" 
UNION ALL SELECT 2, 22222222 , 2000
UNION ALL SELECT 3,  33333333, 3000
UNION ALL SELECT 4,  44444444, 4000
UNION ALL SELECT 5, 55555555, 5000
UNION ALL SELECT 21, 12341234, 1234

Appendix B: Connection Properties for Redshift and Snowflake

Redshift Connection Properties:

Snowflake Connection Properties:


About the Authors

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Praveen Kumar is a Principal Analytics Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-centered services. His areas of interests are serverless technology, modern cloud data warehouses, streaming, and generative AI applications.

Stuti Deshpande is a Big Data Specialist Solutions Architect at AWS. She works with customers around the globe, providing them strategic and architectural guidance on implementing analytics solutions using AWS. She has extensive experience in big data, ETL, and analytics. In her free time, Stuti likes to travel, learn new dance forms, and enjoy quality time with family and friends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Scott Rigney is a Senior Technical Product Manager with AWS and has expertise in analytics, data science, and machine learning. He is passionate about building software products that enable enterprises to make data-driven decisions and drive innovation.