All posts by George Komninos

Create a most-recent view of your data lake using Amazon Redshift Serverless

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/create-a-most-recent-view-of-your-data-lake-using-amazon-redshift-serverless/

Building a robust data lake is very beneficial because it enables organizations have a holistic view of their business and empowers data-driven decisions. The curated layer of a data lake is able to hydrate multiple homogeneous data products, unlocking limitless capabilities to address current and future requirements. However, some concepts of how data lakes work feel counter-intuitive for professionals with a traditional database background.

Data lakes are by design append-only, meaning that the new records with an existing primary key don’t update the existing values out-of-the box; instead the new values are appended, resulting in having multiple occurrences of the same primary key in the data lake. Furthermore, special care needs to be taken to handle row deletions, and even if there is a way to identify deleted primary keys, it’s not straightforward to incorporate them to the data lake.

Traditionally, processing between the different layers of data lakes is performed using distributed processing engines such as Apache Spark that can be deployed in a managed or serverless way using services such as Amazon EMR or AWS Glue. Spark has recently introduced frameworks that give data lakes a flavor of ACID properties, such as Apache Hudi, Apache Iceberg, and Delta Lake. However, if you’re coming from a database background, there is a significant learning curve to adopt these technologies that involves understanding the concepts, moving away from SQL to a more general-purpose language, and adopting a specific ACID framework and its complexities.

In this post, we discuss how to implement a data lake that supports updates and deletions of individual rows using the Amazon Redshift ANSI SQL compliant syntax as our main processing language. We also take advantage of its serverless offering to address scenarios where consumption patterns don’t justify creating a managed Amazon Redshift cluster, which makes our solution cost-efficient. We use Python to interact with the AWS API, but you can also use any other AWS SDK. Finally, we use AWS Glue auto-generated code to ingest data to Amazon Redshift Serverless and AWS Glue crawlers to create metadata for our datasets.

Solution overview

Most of the services we use for this post can be treated as placeholders. For instance, we use Amazon DocumentDB (with MongoDB compatibility) as our data source system, because one of the key features of a data lake is that it can support structured and unstructured data. We use AWS Database Migration Service (AWS DMS) to ingest data from Amazon DocumentDB to the data lake on Amazon Simple Storage Service (Amazon S3). The change data capture (CDC) capabilities of AWS Database Migration Service (AWS DMS) enable us to identify both updated and deleted rows that we want to propagate to the data lake. We use an AWS Glue job to load raw data to Redshift Serverless to use job bookmarks, which allows each run of the load job to ingest new data. You could replace the AWS Glue job with Amazon Redshift Spectrum or the Amazon Redshift COPY command if there is a different way to identify newly arrived data.

After new data is ingested to Amazon Redshift, we use it as our extract, transform, and load (ETL) engine. We trigger a stored procedure to curate the new data, upsert it to our existing table, and unload it to the data lake. To handle data deletions, we have created a scalar UDF in AWS Lambda that we can call from Amazon Redshift to delete the S3 partitions that have been affected by the newly ingested dataset before rewriting them with the updated values. The following diagram showcases the architecture of our solution.

Data ingestion

The dataset we use for this post is available on GitHub. After we create our Amazon DocumentDB instance (for this post, we used engine version 4.0.0 and the db.t3.medium instance class), we ingest the sample dataset. Ingested records look like the this:

{
	"_id": ObjectId("61f998d9b7599a1e904ae84d"),
	"Case_Type": "Confirmed",
	"Cases": 1661,
	"Difference": 135,
	"Date": "5/12/2020",
	"Country_Region": "Sudan",
	"Province_State": "N/A",
	"Admin2": "",
	"Combined_Key": "Sudan",
	"Lat": 12.8628,
	"Long": 30.2176,
	"Prep_Flow_Runtime": "6/5/2022 11:15:39 PM"
}

We then create an AWS DMS dms.t3.medium instance using engine version 3.4.6, a source endpoint for Amazon DocumentDB, and a target endpoint for Amazon S3, adding dataFormat=parquet; as an extra configuration, so that records are stored in Parquet format in the landing zone of the data lake. After we confirm the connectivity of the endpoints using the Test connection feature, we create a database migration task for full load and ongoing replication. We can confirm data is migrated successfully to Amazon S3 by browsing in the S3 location and choosing Query with S3 Select on the Parquet file that has been generated. The result looks like the following screenshot.

We then catalog the ingested data using an AWS Glue crawler on the landing zone S3 bucket, so that we can query the data with Amazon Athena and process it with AWS Glue.

Set up Redshift Serverless

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers. It’s not uncommon to want to take advantage of the rich features Amazon Redshift provides without having a workload demanding enough to justify purchasing an Amazon Redshift provisioned cluster. To address such scenarios, we have launched Redshift Serverless, which includes all the features provisioned clusters have but enables you to only pay for the workloads you run rather than paying for the entire time your Amazon Redshift cluster is up.

Our next step is to set up our Redshift Serverless instance. On the Amazon Redshift console, we choose Redshift Serverless, select the required settings (similar to creating a provisioned Amazon Redshift cluster), and choose Save configuration. If you intend to access your Redshift Serverless instance via a JDBC client, you might need to enable public access. After the setup is complete, we can start using the Redshift Serverless endpoint.

Load data to Redshift Serverless

To confirm our serverless point is accessible, we can create an AWS Glue connection and test its connectivity. The type of the connection is JDBC, and we can find our serverless JDBC URL from the Workgroup configuration section on our Redshift Serverless console. For the connection to be successful, we need to configure the connectivity between the Amazon Redshift and AWS Glue security groups. For more details, refer to Connecting to a JDBC Data Store in a VPC. After connectivity is configured correctly, we can test the connection and if the test is successful, we should see the following message.

We use an AWS Glue job to load the historical dataset to Amazon Redshift; however, we don’t apply any business logic at this step because we want to implement our transformations using ANSI SQL in Amazon Redshift. Our AWS Glue job is mostly auto-generated boilerplate code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

job.commit()

The job reads the table cases from the database document_landing from our AWS Glue Data Catalog, created by the crawler after data ingestion. It copies the underlying data to the table cases_stage in the database dev of the cluster defined in the AWS Glue connection redshift-serverless. We can run this job and use the Amazon Redshift query editor v2 on the Amazon Redshift console to confirm its success by seeing the newly created table, as shown in the following screenshot.

We can now query and transform the historical data using Redshift Serverless.

Transform stored procedure

We have created a stored procedure that performs some common transformations, such as converting a text to date and extracting the year, month, and day from it, and creating some boolean flag fields. After the transformations are performed and stored to the cases table, we want to unload the data to our data lake S3 bucket and finally empty the cases_stage table, preparing it to receive the next CDC load of our pipeline. See the following code:

CREATE OR REPLACE PROCEDURE public.historicalload() LANGUAGE plpgsql AS $$
DECLARE
  sql text;
  s3folder varchar(65535);
  iamrole varchar(1000);
  unload varchar(65535);
begin
  create table cases as (
  SELECT oid__id
  , case_type
  , cases
  , difference
  , TO_DATE(date, 'mm/dd/yyyy') date
  , country_region
  , province_state
  , admin2
  , combined_key
  , lat
  , long
  , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime
  , fips
  , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year
  , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month
  , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day
  , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death
  , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed
  FROM "dev"."public"."cases_stage");
  sql:='select * from "dev"."public"."cases"';
  s3folder:= s3://object-path/name-prefix ';
  iamrole:='arn:aws:iam::<AWS account-id>:role/<role-name>';
  unload := 'unload ('''||sql||''') to '''||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)';
  execute unload;
  truncate "dev"."public"."cases_stage";
END;
$$

Note that for unloading data to Amazon S3, we need to create the AWS Identity and Access Management (IAM) role arn:aws:iam::<AWS account-id>:role/<role-name>, give it the required Amazon S3 permissions, and associate it with our Redshift Serverless instance. Calling the stored procedure after ingesting the historical dataset to the cases_stage table results in loading the transformed and partitioned dataset in the S3 folder specified in Parquet format. After we crawl the folder with an AWS Glue crawler, we can verify the results using Athena.

We can revisit the loading AWS Glue job we described before to automatically invoke the steps we triggered manually. We can use an Amazon Redshift postactions configuration to trigger the stored procedure as soon as the loading of the staging table is complete. We can also use Boto3 to trigger the AWS Glue crawler when data is unloaded to Amazon S3. Therefore, the revisited AWS Glue job looks like the following code:

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

glueClient = boto3.client('glue')

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev", "postactions":post_query}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
glueClient = boto3.client('glue')
response = glueClient.start_crawler(Name='document-lake')
job.commit()

CDC load

Because the AWS DMS migration task we configured is for full load and ongoing replication, it can capture updates and deletions on a record level. Updates come with an OP column with value U, such as in the following example:

U,61f998d9b7599a1e843038cc,Confirmed,25,1,3/24/2020,Sudan,N/A,,Sudan,12.8628,30.2176,6/4/2020 11:15:39 PM,

Deletions have an OP column with value D, the primary key to be deleted, and all other fields empty:

D,61f998d9b7599a1e904ae941,,,,,,,,,,,,

Enabling job bookmarks for our AWS Glue job guarantees that subsequent runs of the job only process new data since the last checkpoint that was set in the previous run of the job without any changes in our code. After the CDC data is loaded in the staging table, our next step is to delete rows with OP column D, perform a merge operation to replace existing rows, and upload the result to our S3 bucket. However, because S3 objects are immutable, we need to delete the partitions that would be affected by the latest ingested dataset and rewrite them from Amazon Redshift. Amazon Redshift doesn’t have an out-of-the-box way to delete S3 objects; however, we can use a scalar Lambda UDF that takes as an argument the partition to be deleted and removes all the objects under the partition. The Python code of our Lambda function uses Boto3 and looks like the following example:

import json
import boto3

datalakeBucket = '<DATA-LAKE-BUCKET>'
folder = 'prod/cases/'
    
def lambda_handler(event, context):
    ret = {}
    res = []
    s3Resource = boto3.resource('s3')
    bucket = s3Resource.Bucket(datalakeBucket)
    for argument in event['arguments']:
        partition = argument[0]
        bucket.objects.filter(Prefix=folder + partition).delete()
        res.append(True)
    ret['success'] = True
    ret['results'] = res
    return json.dumps(ret)

We then need to register the Lambda UDF from in our Amazon Redshift cluster by running the following code:

create or replace external function deletePartition(path VARCHAR)
returns boolean stable 
LAMBDA 'deletePartition'
IAM_ROLE 'arn:aws:iam::<AWS account-id>:role/<role-name>';

The last edge case we might need to take care of is the scenario of a CDC dataset containing multiple records for the same dataset. The approach we take here is to use another date field that is available in the dataset, prep_flow_time, to keep the latest record. To implement this logic in SQL, we use a nested query with the row_number window function.

On a high level, the upsert stored procedure includes the following steps:

  1. Delete partitions that are being updated from the S3 data lake, using the scalar Lambda UDF.
  2. Delete rows that are being updated from Amazon Redshift.
  3. Implement the transformations and compute the latest values of the updated rows.
  4. Insert them into the target table in Amazon Redshift.
  5. Unload the affected partitions to the S3 data lake.
  6. Truncate the staging table.

See the following code:

CREATE OR REPLACE PROCEDURE upsert()
AS $$
DECLARE
  sql text;
  deletePartition text;
  s3folder varchar(65535);
  iamrole varchar(1000);
  unload varchar(65535);
begin
  
  drop table if exists affected_dates;
  create temp table affected_dates as select date from cases where oid__id in (select distinct oid__id from cases_stage cs);
  deletePartition:='select deletePartition(partitionPath) from 
  (select distinct ''year='' || year::VARCHAR || ''/month='' || month::VARCHAR || ''/day='' || day::VARCHAR partitionPath
  from cases
  where date in (select date from affected_dates));';
  execute deletePartition;
	
  delete from cases using cases_stage 
  where cases.oid__id = cases_stage.oid__id;
  insert into cases 
  select oid__id
  , case_type
  , cases
  , difference
  , date
  , country_region
  , province_state
  , admin2
  , combined_key
  , lat
  , long
  , prep_flow_runtime
  , fips
  , year
  , month
  , day
  , is_death
  , is_confirmed
from
(SELECT row_number() over (partition by oid__id order by TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') desc) seq
  , oid__id
  , case_type
  , cases
  , difference
  , TO_DATE(date, 'mm/dd/yyyy') date
  , country_region
  , province_state
  , admin2
  , combined_key
  , lat
  , long
  , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime
  , fips
  , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year
  , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month
  , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day
  , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death
  , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed
  from cases_stage where op = 'U') where seq = 1;
  
  sql:='select *
  from cases
  where date in (select date from affected_dates);';
  s3folder:='s3://<DATA-LAKE-BUCKET>/prod/cases/';
  iamrole:=' arn:aws:iam::<AWS account-id>:role/<role-name>';
  unload := 'unload ('''||sql||''') to ''' ||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)';    
  execute unload;
  truncate cases_stage;
END;
$$ LANGUAGE plpgsql;

Finally, we can modify the aforementioned AWS Glue job to use Boto3 to decide whether this is a historical load or a CDC by using Boto3 to check if the table exists in the AWS Glue Data Catalog. The final version of the AWS Glue job is as follows:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
glueClient = boto3.client('glue')
tableExists = True
post_query="call upsert();"
try:
    response = glueClient.get_table(DatabaseName='document-data-lake', Name='cases')
except glueClient.exceptions.EntityNotFoundException:
    tableExists = False
    post_query="call historicalLoad();"

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev", "postactions":post_query}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
if not tableExists:
    response = glueClient.start_crawler(Name='document-lake')

job.commit()

Running this job for the first time with job bookmarks enabled loads the historical data to the Redshift Serverless staging table and triggers the historical load stored procedure, which in turn performs the transformations we implemented using SQL and unloads the result to the S3 data lake. Subsequent runs ingest newly arrived data from the landing zone, ingests them to the staging table, and performs the upsert process, deleting and repopulating the affected partitions in the data lake.

Conclusion

Building a modern data lake that maintains a most recent view involves some edge cases that aren’t intuitive if you come from a RDBMS background. In this post, we described an AWS native approach that takes advantage of the rich features and SQL syntax of Amazon Redshift, paying only for the resources used thanks to Redshift Serverless and without using any external framework.

The introduction of Amazon Redshift Serverless unlocks the hundreds of Redshift features released every year to users that do not require a cluster that’s always up and running. You can start experimenting with this approach of managing your data lake with Redshift, as well as addressing other use cases that are now easier to solve with Redshift Serverless.


About the author

George Komninos is a solutions architect for the AWS Data Lab. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent three years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Design and build a Data Vault model in Amazon Redshift from a transactional database

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/design-and-build-a-data-vault-model-in-amazon-redshift-from-a-transactional-database/

Building a highly performant data model for an enterprise data warehouse (EDW) has historically involved significant design, development, administration, and operational effort. Furthermore, the data model must be agile and adaptable to change while handling the largest volumes of data efficiently.

Data Vault is a methodology for delivering project design and implementation to accelerate the build of data warehouse projects. Within the overall methodology, the Data Vault 2.0 data modeling standards are popular and widely used within the industry because they emphasize the business keys and their associations within the delivery of business processes. Data Vault facilitates the rapid build of data models via the following:

  • Pattern-based entities each with a well-defined purpose
  • Data silos are removed because data is represented in source system independent structures
  • Data can be loaded in parallel with minimum dependencies
  • Historized data is stored at its lowest level of granularity
  • Flexible business rules can be applied independently of the loading of the data
  • New data sources can be added with no impact on the existing model.

We always recommend working backwards from the business requirements to choose the most suitable data modelling pattern to use; there are times where Data Vault will not be the best choice for your enterprise data warehouse and another modelling pattern will be more suitable.

In this post, we demonstrate how to implement a Data Vault model in Amazon Redshift and query it efficiently by using the latest Amazon Redshift features, such as separation of compute from storage, seamless data sharing, automatic table optimizations, and materialized views.

Data Vault data modeling overview

A data warehouse platform built using Data Vault typically has the following architecture:

The architecture consists of four layers:

  • Staging – Contains a copy of the latest changes to data from the source systems. This layer doesn’t hold history and, during its population, you can apply several transformations to the staged data, including data type changes or resizing, character set conversion, and the addition of meta-data columns to support later processing.
  • Raw Data Vault – Holds the historized copy of all of the data from multiple source systems. No filters or business transformations have occurred at this point except for storing the data in source-system independent targets.
  • Business Data Vault – An optional delivery, but is very often built. It contains business calculations and de-normalizations with the sole purpose of improving the speed and simplicity of access within the consumption layer, which is called the Information Mart layer.
  • Information Mart Layer – Where data is most commonly accessed by consumers, for example reporting dashboards or extracts. You can build multiple marts from the one Data Vault Integration Layer, and the most common data modeling choice for these marts is Star/Kimball schemas.

Convert a Third Normal Form transactional schema to a Data Vault schema

The following entity relationship diagram is a standard implementation of a transactional model that a sports ticket selling service could use:

The main entities within the schema are sporting events, customers, and tickets. A customer is a person, and a person can purchase one or multiple tickets for a sporting event. This business event is captured by the Ticket Purchase History intersection entity above. Finally, a sporting event has many tickets available to purchase and is staged within a single city.

To convert this source model to a Data Vault model, we start to identify the business keys, their descriptive attributes, and the business transactions. The three main entity types in the Raw Data Vault model are as follows:

  • Hubs – A collection of Business Keys discovered for each business entity.
  • Links – Business transactions within the process being modelled. This is always between two or more business keys (hubs) and recorded at a point in time.
  • Satellites – Historized reference data about either the business key (Hub) or business transaction (link).

The following example solution represents some of the sporting event entities when converted into the preceeding Raw Data Vault objects.

Hub entities

The hub is the definitive list of business keys loaded into the Raw Data Vault layer from all of the source systems. A business key is used to uniquely identify a business entity and is never duplicated. In our example, the source system has assigned a surrogate key field called Id to represent the Business Key, so this is stored in a column on the Hub called sport_event_id. Some common additional columns on hubs include the Load DateTimeStamp which records the date and time the business key was first discovered, and the Record Source which records the name of the source system where this business key was first loaded. Although, you don’t have to create a surrogate type (hash or sequence) for the primary key column, it is very common in Data Vault to hash the business key, so our example does this. Amazon Redshift supports multiple cryptographic hash functions like MD5, FNV, SHA1, and SHA2, which you can choose to generate your primary key column. See the following code :

create table raw_data_vault.hub_sport_event 
(
  sport_event_pk  varchar(32) not null     
 ,sport_event_id  integer     not null
 ,load_dts        timestamp   not null       
 ,record_source   varchar(10) not null      
);

Note the following:

  • The preceeding code assumes the MD5 hashing algorithm is used. If using FNV_HASH, the datatype will be Bigint.
  • The Id column is the business key from the source feed. It’s passed into the hashing function for the _PK column.
  • In our example, there is only a single value for the business key. If a compound key is required, then more than one column can be added.
  • Load_DTS is populated via the staging schema or extract, transform, and load (ETL) code.
  • Record_Source is populated via the staging schema or ETL code.

Link entities

The link object is the occurrence of two or more business keys undertaking a business transaction, for example purchasing a ticket for a sporting event. Each of the business keys is mastered in their respective hubs, and a primary key is generated for the link comprising all of the business keys (typically separated by a delimiter field like ‘^’). As with hubs, some common additional columns are added to links, including the Load DateTimeStamp which records the date and time the transaction was first discovered, and the Record Source which records the name of the source system where this transaction was first loaded. See the following code:

create table raw_data_vault.lnk_ticket_sport_event 
(
  ticket_sport_event_pk varchar(32)  not null    
 ,ticket_fk             varchar(32)  not null   
 ,sport_event_fk        varchar(32)  not null   
 ,load_dts              timestamp    not null   
 ,record_source         varchar(10)  not null   
);

Note the following:

  • The code assumes that the MD5 hashing algorithm is used. The _PK column is hashed values of concatenated ticket and sporting event business keys from the source data feed, for example MD5(ticket_id||'^'||sporting_event_id)
  • The two _FK columns are foreign keys linked to the primary key of the respective hubs.
  • Load_DTS is populated via the staging schema or ETL code.
  • Record_Source is populated via the staging schema or ETL code.

Satellite entities

The history of data about the hub or link is stored in the satellite object. The Load DateTimeStamp is part of the compound key of the satellite along with the primary key of either the hub or link because data can change over time. There are choices within the Data Vault standards for how to store satellite data from multiple sources. A common approach is to append the name of the feed to the satellite name. This lets a single hub contain reference data from more than one source system, and for new sources to be added without impact to the existing design. See the following code:

create table raw_data_vault.sat_sport_event 
(
  sport_event_pk    varchar(32) not null     
 ,load_dts          timestamp   not null  
 ,sport_type_name   varchar(50)
 ,start_datetime    timestamp
 ,sold_out          boolean     
 ,record_source     varchar(10) not null 
 ,hash_diff         varchar(32) not null 
);

Note the following:

  • The sport_event_pk value is inherited from the hub.
  • The compound key is the sport_event_pk and load_dts columns. This allows history to be maintained.
  • The business attributes are typically optional.
  • Load_DTS is populated via the staging schema or ETL code.
  • Record_Source is populated via the staging schema or ETL code.
  • Hash_Diff is a Data Vault technique to simplify the identification of data changes within satellites. The business attribute values are concatenated and hashed with your algorithm of choice. Then, during the ETL processing, only the two hash values (one on the source record and one on the latest dated satellite record) should be compared.

Converted Data Vault Model

If we take the preceding three Data Vault entity types above, we can convert the source data model into a Data Vault data model as follows:

The Business Data Vault contains business-centric calculations and performance de-normalizations that are read by the Information Marts. Some of the object types that are created in the Business Vault layer include the following:

  • PIT (point in time) tables – You can store data in more than one satellite for a single hub, each with a different Load DateTimeStamp depending on when the data was loaded. A PIT table simplifies access to all of this data by creating a table or materialized view to present a single row with all of the relevant data to hand. The compound key of a PIT table is the primary key from the hub, plus a snapshot date or snapshot date and time for the frequency of the population. Once a day is the most popular, but equally the frequency could be every 15 minutes or once a month.
  • Bridge tables – Similar to PIT tables, bridge tables take the data from more than one link or link satellite and again de-normalize into a single row. This single row view makes accessing complex datasets over multiple tables from the Raw Data Vault much more straightforward and performant. Like a PIT table, the bridge table can be either a table or materialized view.
  • KPI tables – The pre-computed business rules calculate KPIs and store them in dedicated tables.
  • Type 2 tables –You can apply additional processing in the Business Data Vault to calculate Type 2 like time periods because the data in the Raw Data Vault follows an insert only pattern.

The architecture of Amazon Redshift allows flexibility in the design of the Data Vault platform by using the capabilities of the Amazon Redshift RA3 instance type to separate the compute resources from the data storage layer and the seamless ability to share data between different Amazon Redshift clusters. This flexibility allows highly performant and cost-effective Data Vault platforms to be built. For example, the Staging and Raw Data Vault Layers are populated 24-hours-a-day in micro batches by one Amazon Redshift cluster, the Business Data Vault layer can be built one-time-a-day and paused to save costs when completed, and any number of consumer Amazon Redshift clusters can access the results. Depending on the processing complexity of each layer, Amazon Redshift supports independently scaling the compute capacity required at each stage.

All of the underlying tables in Raw Data Vault can be loaded simultaneously. This makes great use of the massively parallel processing architecture in Amazon Redshift. For our business model, it makes sense to create a Business Data Vault layer, which can be read by an Information Mart to perform dimensional analysis on ticket sales. It can give us insights on the top home teams in fan attendance and how that correlates with specific sport locations or cities. Running these queries involves joining multiple tables. It’s important to design an optimal Business Data Vault layer to avoid excessive joins for deriving these insights.

For example, to get the number of tickets per city for June 2021, the SQL looks like the following code:

SELECT name,count(lpt.ticket_fk) as tickets_sold
FROM lnk_person_ticket lpt
  JOIN lnk_ticket_sport_event ltse on lpt.ticket_fk = ltse.ticket_fk
  JOIN lnk_sport_event_location lsel on ltse.sport_event_fk = lsel.sport_event_fk
  JOIN lnk_location_city llc on lsel.location_fk = llc.location_fk
  JOIN sat_city sc on llc.city_fk = sc.city_pk
  JOIN sat_sport_event sse on lsel.sport_event_fk = sse.sport_event_pk
Where start_date between '2021-06-05' and '2021-06-15' group by 1;

We can use the EXPLAIN command for the preceding query to get the Amazon Redshift query plan. The following plan shows that the specified joins require broadcasting data across nodes, since the join conditions are on different keys. This makes the query computationally expensive:

dev=# explain SELECT
  name, count(lpt.ticket_fk) as tickets_sold FROM lnk_person_ticket lpt
  JOIN lnk_ticket_sport_event ltse on lpt.ticket_fk = ltse.ticket_fk
  JOIN lnk_sport_event_location lsel on ltse.sport_event_fk = lsel.sport_event_fk
  JOIN lnk_location_city llc on lsel.location_fk = llc.location_fk
  JOIN sat_city sc on llc.city_fk = sc.city_pk
  JOIN sat_sport_event sse on lsel.sport_event_fk = sse.sport_event_pk
where
  start_date between ‘2021-06-05’
  and ‘2021-06-15’
GROUP BY
  1;
                            QUERY PLAN
----------------------------------------------------------------------
 XN HashAggregate  (cost=96331086541.29..96331086564.36 rows=9226 width=49)
   ->  XN Hash Join DS_BCAST_INNER  (cost=166693605.84..96331086495.16 rows=9226 width=49)
         Hash Cond: ((“outer”.ticket_fk)::text = (“inner”.ticket_fk)::text)
         ->  XN Hash Join DS_BCAST_INNER  (cost=166690878.95..344629685.90 rows=621783 width=49)
               Hash Cond: ((“outer”.sport_event_fk)::text = (“inner”.sport_event_fk)::text)
               ->  XN Seq Scan on lnk_ticket_sport_event ltse  (cost=0.00..147804.35 rows=14780435 width=72)
               ->  XN Hash  (cost=166690878.49..166690878.49 rows=185 width=85)
                     ->  XN Hash Join DS_BCAST_INNER  (cost=49690773.08..166690878.49 rows=185 width=85)
                           Hash Cond: ((“outer”.location_fk)::text = (“inner”.location_fk)::text)
                           ->  XN Hash Join DS_BCAST_INNER  (cost=65.61..79200165.14 rows=179 width=108)
                                 Hash Cond: ((“outer”.sport_event_fk)::text = (“inner”.sport_event_pk)::text)
                                 ->  XN Seq Scan on lnk_sport_event_location lsel  (cost=0.00..43.44 rows=4344 width=72)
                                 ->  XN Hash  (cost=65.16..65.16 rows=180 width=36)
                                       ->  XN Seq Scan on sat_sport_event sse  (cost=0.00..65.16 rows=180 width=36)
                                             Filter: ((start_date <= ‘2021-06-15’::date) AND (start_date >= ‘2021-06-05’::date))
                           ->  XN Hash  (cost=49690707.31..49690707.31 rows=63 width=49)
                                 ->  XN Hash Join DS_BCAST_INNER  (cost=0.78..49690707.31 rows=63 width=49)
                                       Hash Cond: ((“outer”.city_pk)::text = (“inner”.city_fk)::text)
                                       ->  XN Seq Scan on sat_city sc  (cost=0.00..27909.51 rows=2790951 width=49)
                                       ->  XN Hash  (cost=0.62..0.62 rows=62 width=72)
                                             ->  XN Seq Scan on lnk_location_city llc  (cost=0.00..0.62 rows=62 width=72)
         ->  XN Hash  (cost=2181.51..2181.51 rows=218151 width=36)
               ->  XN Seq Scan on lnk_person_ticket lpt  (cost=0.00..2181.51 rows=218151 width=36)
(23 rows)

Let’s discuss the latest Amazon Redshift features that help optimize the performance of these queries on top of a Business Data Vault model.

Use Amazon Redshift features to query the Data Vault

Automatic table optimization

Traditionally, to optimize joins in Amazon Redshift, it’s recommended to use distribution keys and styles to co-locate data in the same nodes, as based on common join predicates. The Raw Data Vault layer has a very well-defined pattern, which is ideal for determining the distribution keys. However, the broad range of SQL queries applicable to the Business Data Vault makes it hard to predict your consumption pattern that would drive your distribution strategy.

Automatic table optimization lets you get the fastest performance quickly without needing to invest time to manually tune and implement table optimizations. Automatic table optimization continuously observes how queries interact with tables, and it uses machine learning (ML) to select the best sort and distribution keys to optimize performance for the cluster’s workload. If Amazon Redshift determines that applying a key will improve cluster performance, then tables are automatically altered within hours without requiring administrator intervention.

Automatic Table Optimization provided following recommendations for the above query to get the number of tickets per city for June 2021. The recommendations suggest modifying the distribution style and sort keys for tables involved in these queries.

dev=# select * from svv_alter_table_recommendations;
   type    | database | table_id | group_id |                                           ddl                                           | auto_eligible
-----------+----------+----------+----------+-----------------------------------------------------------------------------------------+---------------
 diststyle | dev      |   127372 |        0 | ALTER TABLE “public”.“lnk_person_ticket” ALTER DISTSTYLE KEY DISTKEY “ticket_fk”        | f
 sortkey   | dev      |   127421 |       -1 | ALTER TABLE “public”.“lnk_ticket_sport_event” ALTER COMPOUND SORTKEY (“sport_event_fk”) | f
 diststyle | dev      |   127421 |        0 | ALTER TABLE “public”.“lnk_ticket_sport_event” ALTER DISTSTYLE KEY DISTKEY “ticket_fk”   | f
 sortkey   | dev      |   145032 |       -1 | ALTER TABLE “public”.“sat_city” ALTER COMPOUND SORTKEY (“city_pk”)                      | f

After the recommended distribution keys and sort keys were applied by Automatic Table Optimization, the explain plan shows “DS_DIST_NONE” and no data redistribution was required anymore for this query. The data required for the joins was co-located across Amazon Redshift nodes.

QUERY PLAN
----------------------------------------------------------------------
 XN HashAggregate  (cost=344646541.29..344646564.36 rows=9226 width=49)
   ->  XN Hash Join DS_DIST_NONE  (cost=166693605.84..344646495.16 rows=9226 width=49)
         Hash Cond: ((“outer”.ticket_fk)::text = (“inner”.ticket_fk)::text)
         ->  XN Hash Join DS_BCAST_INNER  (cost=166690878.95..344629685.90 rows=621783 width=49)
               Hash Cond: ((“outer”.sport_event_fk)::text = (“inner”.sport_event_fk)::text)
               ->  XN Seq Scan on lnk_ticket_sport_event ltse  (cost=0.00..147804.35 rows=14780435 width=72)
               ->  XN Hash  (cost=166690878.49..166690878.49 rows=185 width=85)
                     ->  XN Hash Join DS_BCAST_INNER  (cost=49690773.08..166690878.49 rows=185 width=85)
                           Hash Cond: ((“outer”.location_fk)::text = (“inner”.location_fk)::text)
                           ->  XN Hash Join DS_BCAST_INNER  (cost=65.61..79200165.14 rows=179 width=108)
                                 Hash Cond: ((“outer”.sport_event_fk)::text = (“inner”.sport_event_pk)::text)
                                 ->  XN Seq Scan on lnk_sport_event_location lsel  (cost=0.00..43.44 rows=4344 width=72)
                                 ->  XN Hash  (cost=65.16..65.16 rows=180 width=36)
                                       ->  XN Seq Scan on sat_sport_event sse  (cost=0.00..65.16 rows=180 width=36)
                                             Filter: ((start_date <= ‘2021-06-15’::date) AND (start_date >= ‘2021-06-05’::date))
                           ->  XN Hash  (cost=49690707.31..49690707.31 rows=63 width=49)
                                 ->  XN Hash Join DS_BCAST_INNER  (cost=0.78..49690707.31 rows=63 width=49)
                                       Hash Cond: ((“outer”.city_pk)::text = (“inner”.city_fk)::text)
                                       ->  XN Seq Scan on sat_city sc  (cost=0.00..27909.51 rows=2790951 width=49)
                                       ->  XN Hash  (cost=0.62..0.62 rows=62 width=72)
                                             ->  XN Seq Scan on lnk_location_city llc  (cost=0.00..0.62 rows=62 width=72)
         ->  XN Hash  (cost=2181.51..2181.51 rows=218151 width=36)
               ->  XN Seq Scan on lnk_person_ticket lpt  (cost=0.00..2181.51 rows=218151 width=36)
(23 rows)

Materialized views in Amazon Redshift

The data analyst responsible for running this analysis benefits significantly by creating a materialized view in the Business Data Vault schema that pre-computes the results of the queries by running the following SQL:

CREATE MATERIALIZED VIEW bridge_city_ticket_aggregation_mv
AUTO REFRESH YES
AS SELECT name, count(lpt.ticket_fk) as tickets_sold
FROM lnk_person_ticket lpt
JOIN lnk_ticket_sport_event ltse on lpt.ticket_fk = ltse.ticket_fk 
JOIN lnk_sport_event_location lsel on ltse.sport_event_fk = lsel.sport_event_fk 
JOIN lnk_location_city llc on lsel.location_fk = llc.location_fk 
JOIN sat_city sc on llc.city_fk = sc.city_pk 
GROUP BY 1;

To get the latest satellite values, we must include load_dts in our join. For simplicity, we don’t do that for this post.

You can optimize this query both in terms of code length and complexity to something as simple as the following:

SELECT * FROM bridge_city_ticket_aggregation_mv;

The run plan in this case is as follows:

XN Seq Scan on mv_tbl__bridge_city_ticket_aggregation_mv__0 derived_table1  (cost=0.00..0.36 rows=36 width=524)

More importantly, Amazon Redshift can automatically use the materialized view even if that’s not explicitly stated.

The preceding scenario addresses the needs of a specific analysis because the resulting materialized view is an aggregate. In a more generic scenario, after reviewing our Data Vault ER diagram, you can observe that any query that involves ticket sales analysis per location requires a substantial number of joins, all of which use different join keys. Therefore, any such analysis comes at a significant cost regarding performance. For example, to get the count of tickets sold per city and stadium name, you must run a query like the following:

SELECT sc.name city_name, ssl.name stadium_name, count(lpt.ticket_fk) tickets_sold
FROM lnk_person_ticket lpt
JOIN lnk_ticket_sport_event ltse ON lpt.ticket_fk = ltse.ticket_fk 
JOIN lnk_sport_event_location lsel ON ltse.sport_event_fk = lsel.sport_event_fk 
JOIN sat_location ssl ON lsel.location_fk = ssl.location_pk 
JOIN lnk_location_city llc ON lsel.location_fk = llc.location_fk 
JOIN sat_city sc ON llc.city_fk = sc.city_pk 
GROUP BY 1, 2;

You can use the EXPLAIN command for the preceding query to get the explain plan and know how expensive such an operation is:

XN HashAggregate  (cost=99574385259.46..99574385829.64 rows=228071 width=68)
  ->  XN Hash Join DS_BCAST_INNER  (cost=127173776.83..99574383548.93 rows=228071 width=68)
        Hash Cond: (("outer".sport_event_fk)::text = ("inner".sport_event_fk)::text)
        ->  XN Hash Join DS_BCAST_INNER  (cost=2726.89..95986925283.91 rows=219289 width=72)
              Hash Cond: (("outer".ticket_fk)::text = ("inner".ticket_fk)::text)
              ->  XN Seq Scan on lnk_ticket_sport_event ltse  (cost=0.00..147804.35 rows=14780435 width=72)
              ->  XN Hash  (cost=2181.51..2181.51 rows=218151 width=36)
                    ->  XN Seq Scan on lnk_person_ticket lpt  (cost=0.00..2181.51 rows=218151 width=36)
        ->  XN Hash  (cost=127171038.56..127171038.56 rows=4553 width=68)
              ->  XN Hash Join DS_BCAST_INNER  (cost=49690708.24..127171038.56 rows=4553 width=68)
                    Hash Cond: (("outer".location_fk)::text = ("inner".location_fk)::text)
                    ->  XN Hash Join DS_BCAST_INNER  (cost=0.78..39680186.12 rows=4416 width=127)
                          Hash Cond: (("outer".location_fk)::text = ("inner".location_pk)::text)
                          ->  XN Seq Scan on lnk_sport_event_location lsel  (cost=0.00..43.44 rows=4344 width=72)
                          ->  XN Hash  (cost=0.62..0.62 rows=62 width=55)
                                ->  XN Seq Scan on sat_location ssl  (cost=0.00..0.62 rows=62 width=55)
                    ->  XN Hash  (cost=49690707.31..49690707.31 rows=63 width=49)
                          ->  XN Hash Join DS_BCAST_INNER  (cost=0.78..49690707.31 rows=63 width=49)
                                Hash Cond: (("outer".city_pk)::text = ("inner".city_fk)::text)
                                ->  XN Seq Scan on sat_city sc  (cost=0.00..27909.51 rows=2790951 width=49)
                                ->  XN Hash  (cost=0.62..0.62 rows=62 width=72)
                                      ->  XN Seq Scan on lnk_location_city llc  (cost=0.00..0.62 rows=62 width=72)

We can identify commonly joined tables, like hub_sport_event, hub_ticket and hub_location, and then boost the performance of queries by creating materialized views that implement these joins ahead of time. For example, we can create a materialized view to join tickets to sport locations:

CREATE MATERIALIZED VIEW bridge_tickets_per_stadium_mv
AUTO REFRESH YES
AS select hsl.hub_sport_location_key location_id, hub_ticket_seq tickets_id , start_date date, "name" stadium_name
from hub_ticket
join lnk_ticket_sport_event ltse on hub_ticket_seq = hub_ticket_key
join hub_sport_event hse on hse.hub_sport_event_key = ltse.hub_sport_event_seq
join sat_sport_event sse on sse.hub_sport_event_key = hse.hub_sport_event_key
join lnk_sport_event_location lsel on hse.hub_sport_event_key = lsel.hub_sport_event_seq
join hub_sport_location hsl on hub_location_seq = hub_sport_location_key
join sat_sport_location ssl on ssl.hub_sport_location_key = hsl.hub_sport_location_key;

If we don’t make any edits to the expensive query that we ran before, then the run plan is as follows:

XN HashAggregate (cost=88052548.77..88064188.37 rows=4655838 width=457)
-> XN Hash Join DS_BCAST_INNER (cost=49690707.47..88017629.99 rows=4655838 width=457)
Hash Cond: (("outer".location_id)::text = ("inner".hub_location_seq)::text)
-> XN Seq Scan on mv_tbl__bridge_tickets_per_stadium_mv__0 derived_table1 (cost=0.00..147804.35 rows=14780435 width=510)
-> XN Hash (cost=49690707.31..49690707.31 rows=63 width=49)
-> XN Hash Join DS_BCAST_INNER (cost=0.78..49690707.31 rows=63 width=49)
Hash Cond: (("outer".hub_city_key)::text = ("inner".hub_city_seq)::text)
-> XN Seq Scan on hub_city hc (cost=0.00..27909.51 rows=2790951 width=49)
-> XN Hash (cost=0.62..0.62 rows=62 width=72)
-> XN Seq Scan on lnk_location_city llc (cost=0.00..0.62 rows=62 width=72)

Amazon Redshift now uses the materialized view for any future queries that involve joining tickets with sports locations. For example, a separate business intelligence (BI) team looking into the dates with the highest ticket sales can run a query like the following:

select start_date date, count(hub_ticket_seq) tickets
from hub_ticket
join lnk_ticket_sport_event ltse on hub_ticket_seq = hub_ticket_key
join hub_sport_event hse on hse.hub_sport_event_key  = ltse.hub_sport_event_seq 
join sat_sport_event sse on sse.hub_sport_event_key = hse.hub_sport_event_key 
join lnk_sport_event_location lsel on hse.hub_sport_event_key = lsel.hub_sport_event_seq
join hub_sport_location hsl on hub_location_seq = hub_sport_location_key
join sat_sport_location ssl on ssl.hub_sport_location_key = hsl.hub_sport_location_key 
group by 1
order by 2 desc
limit 10;

Amazon Redshift can implicitly understand that the query can be optimized by using the materialized view we already created, thereby avoiding joins that involve broadcasting data across nodes. This can be seen from the run plan:

XN Limit (cost=1000000221707.65..1000000221707.68 rows=10 width=40)
-> XN Merge (cost=1000000221707.65..1000000221707.75 rows=39 width=40)
Merge Key: count(derived_table1.tickets_id)
-> XN Network (cost=1000000221707.65..1000000221707.75 rows=39 width=40)
Send to leader
-> XN Sort (cost=1000000221707.65..1000000221707.75 rows=39 width=40)
Sort Key: count(derived_table1.tickets_id)
-> XN HashAggregate (cost=221706.52..221706.62 rows=39 width=40)
-> XN Seq Scan on mv_tbl__bridge_tickets_per_stadium_mv__0 derived_table1 (cost=0.00..147804.35 rows=14780435 width=40)

If we drop the materialized view, then the preceding query results in the following plan:

XN Limit (cost=7509421514303.64..7509421514303.66 rows=10 width=40)
-> XN Merge (cost=7509421514303.64..7509421514303.73 rows=39 width=40)
Merge Key: count(ltse.hub_ticket_seq)
-> XN Network (cost=7509421514303.64..7509421514303.73 rows=39 width=40)
Send to leader
-> XN Sort (cost=7509421514303.64..7509421514303.73 rows=39 width=40)
Sort Key: count(ltse.hub_ticket_seq)
-> XN HashAggregate (cost=6509421514302.51..6509421514302.61 rows=39 width=40)
-> XN Hash Join DS_BCAST_INNER (cost=54745206.40..6509421439263.58 rows=15007786 width=40)
Hash Cond: (("outer".hub_sport_event_seq)::text = ("inner".hub_sport_event_seq)::text)
-> XN Hash Join DS_BCAST_INNER (cost=184864.04..6507391239560.52 rows=14634339 width=148)
Hash Cond: (("outer".hub_ticket_seq)::text = ("inner".hub_ticket_key)::text)
-> XN Hash Join DS_BCAST_INNER (cost=108.60..3997288304.94 rows=14558405 width=148)
Hash Cond: (("outer".hub_sport_event_seq)::text = ("inner".hub_sport_event_key)::text)
-> XN Hash Join DS_BCAST_INNER (cost=54.30..2085599304.09 rows=14669000 width=112)
Hash Cond: (("outer".hub_sport_event_seq)::text = ("inner".hub_sport_event_key)::text)
-> XN Seq Scan on lnk_ticket_sport_event ltse (cost=0.00..147804.35 rows=14780435 width=72)
-> XN Hash (cost=43.44..43.44 rows=4344 width=40)
-> XN Seq Scan on sat_sport_event sse (cost=0.00..43.44 rows=4344 width=40)
-> XN Hash (cost=43.44..43.44 rows=4344 width=36)
-> XN Seq Scan on hub_sport_event hse (cost=0.00..43.44 rows=4344 width=36)
-> XN Hash (cost=147804.35..147804.35 rows=14780435 width=36)
-> XN Seq Scan on hub_ticket (cost=0.00..147804.35 rows=14780435 width=36)
-> XN Hash (cost=54560331.14..54560331.14 rows=4489 width=36)
-> XN Hash Join DS_BCAST_INNER (cost=1.55..54560331.14 rows=4489 width=36)
Hash Cond: (("outer".hub_location_seq)::text = ("inner".hub_sport_location_key)::text)
-> XN Hash Join DS_BCAST_INNER (cost=0.78..27280186.11 rows=4416 width=108)
Hash Cond: (("outer".hub_location_seq)::text = ("inner".hub_sport_location_key)::text)
-> XN Seq Scan on lnk_sport_event_location lsel (cost=0.00..43.44 rows=4344 width=72)
-> XN Hash (cost=0.62..0.62 rows=62 width=36)
-> XN Seq Scan on sat_sport_location ssl (cost=0.00..0.62 rows=62 width=36)
-> XN Hash (cost=0.62..0.62 rows=62 width=36)
-> XN Seq Scan on hub_sport_location hsl (cost=0.00..0.62 rows=62 width=36)

End-users of the data warehouse don’t need to worry about refreshing the data in the materialized views. This is because we enabled automatic materialized view refresh. Future use cases involving new dimensions also benefit from the existence of materialized views.

Prepared statements in the data vault with materialized views in Amazon Redshift

Another type of query that we can run on top of the Business Data Vault schema is prepared statements with bind variables. It’s quite common to see user interfaces integrated with data warehouses, which lets users dynamically change the value of the variable through selection in a choice list or link in a cross-tab. When the variable changes, so do the query condition and the report or dashboard contents. The following query is a prepared statement to get the count of tickets sold per city and stadium name. It takes the stadium name as a variable and provides the number of tickets sold in that stadium.

PREPARE prep_statement (varchar(100))
AS select hc.name city_name, ssl."name" stadium_name, count(hub_ticket_seq) tickets
 from hub_ticket
 join lnk_ticket_sport_event ltse on hub_ticket_seq = hub_ticket_key
 join hub_sport_event hse on hse.hub_sport_event_key = ltse.hub_sport_event_seq
 join sat_sport_event sse on sse.hub_sport_event_key = hse.hub_sport_event_key
 join lnk_sport_event_location lsel on hse.hub_sport_event_key = lsel.hub_sport_event_seq
 join hub_sport_location hsl on hub_location_seq = hub_sport_location_key
 join sat_sport_location ssl on ssl.hub_sport_location_key = hsl.hub_sport_location_key
 join lnk_location_city llc on llc.hub_location_seq = hsl.hub_sport_location_key
 join hub_city hc on llc.hub_city_seq = hc.hub_city_key
 where ssl."name"  = $1
 group by 1, 2;
PREPARE

Let’s run the query to see the city and tickets sold for different stadiums passed as a variable in this prepared statement:

dev=# EXECUTE prep_statement('Lucas Oil Stadium');
  city_name   |   stadium_name    | tickets
--------------+-------------------+---------
 Indianapolis | Lucas Oil Stadium |    8892
(1 row)

dev=# EXECUTE prep_statement('Ford Field');
 city_name | stadium_name | tickets
-----------+--------------+---------
 Detroit   | Ford Field   |   42720
(1 row)

Let’s dive into the explain plan of this prepared statement to understand if Amazon Redshift can implicitly understand that the query can be optimized by using the materialized view bridge_tickets_per_stadium_mv that was created earlier:

XN HashAggregate  (cost=87685290.31..87685914.69 rows=249748 width=66)
->  XN Hash Join DS_BCAST_INNER  (cost=49690707.47..87683417.20 rows=249748 width=66)
Hash Cond: (("outer".location_id)::text = ("inner".hub_location_seq)::text)
->  XN Seq Scan on *mv_tbl__bridge_tickets_per_stadium_mv__0* derived_table1  (cost=0.00..184755.44 rows=242303 width=89)
*Filter: ((stadium_name)::text = ($1)::text)*
->  XN Hash  (cost=49690707.31..49690707.31 rows=63 width=49)
->  XN Hash Join DS_BCAST_INNER  (cost=0.78..49690707.31 rows=63 width=49)
Hash Cond: (("outer".hub_city_key)::text = ("inner".hub_city_seq)::text)
->  XN Seq Scan on hub_city hc  (cost=0.00..27909.51 rows=2790951 width=49)
->  XN Hash  (cost=0.62..0.62 rows=62 width=72)
->  XN Seq Scan on lnk_location_city llc  (cost=0.00..0.62 rows=62 width=72)

As noted in the explain plan, Amazon Redshift could optimize the explain plan of the query to implicitly use the materialized view created earlier, even for prepared statements.

Conclusion

In this post, we’ve demonstrated how to implement Data Vault model in Amazon Redshift, thereby levering the out-of-the-box features. We also discussed how Amazon Redshift’s features, such as seamless data share, automatic table optimization, materialized views, and automatic materialized view refresh can help you build data models that meet high performance requirements.


About the Authors

George Komninos is a solutions architect for the AWS Data Lab. He helps customers convert their ideas to a production-ready data products. Before AWS, he spent three years at Alexa Information as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Devika Singh is a Senior Solutions Architect at Amazon Web Services. Devika helps customers architect and build database and data analytics solutions to accelerate their path to production as part of the AWS Data Lab. She has expertise in database and data warehouse migrations to AWS, helping customers improve the value of their solutions with AWS.

Simon Dimaline has specialized in data warehousing and data modeling for more than 20 years. He currently works for the Data & Analytics practice within AWS Professional Services accelerating customers’ adoption of AWS analytics services.

How to delete user data in an AWS data lake

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/how-to-delete-user-data-in-an-aws-data-lake/

General Data Protection Regulation (GDPR) is an important aspect of today’s technology world, and processing data in compliance with GDPR is a necessity for those who implement solutions within the AWS public cloud. One article of GDPR is the “right to erasure” or “right to be forgotten” which may require you to implement a solution to delete specific users’ personal data.

In the context of the AWS big data and analytics ecosystem, every architecture, regardless of the problem it targets, uses Amazon Simple Storage Service (Amazon S3) as the core storage service. Despite its versatility and feature completeness, Amazon S3 doesn’t come with an out-of-the-box way to map a user identifier to S3 keys of objects that contain user’s data.

This post walks you through a framework that helps you purge individual user data within your organization’s AWS hosted data lake, and an analytics solution that uses different AWS storage layers, along with sample code targeting Amazon S3.

Reference architecture

To address the challenge of implementing a data purge framework, we reduced the problem to the straightforward use case of deleting a user’s data from a platform that uses AWS for its data pipeline. The following diagram illustrates this use case.

We’re introducing the idea of building and maintaining an index metastore that keeps track of the location of each user’s records and allows us locate to them efficiently, reducing the search space.

You can use the following architecture diagram to delete a specific user’s data within your organization’s AWS data lake.

For this initial version, we created three user flows that map each task to a fitting AWS service:

Flow 1: Real-time metastore update

The S3 ObjectCreated or ObjectDelete events trigger an AWS Lambda function that parses the object and performs an add/update/delete operation to keep the metadata index up to date. You can implement a simple workflow for any other storage layer, such as Amazon Relational Database Service (RDS), Amazon Aurora, or Amazon Elasticsearch Service (ES). We use Amazon DynamoDB and Amazon RDS for PostgreSQL as the index metadata storage options, but our approach is flexible to any other technology.

Flow 2: Purge data

When a user asks for their data to be deleted, we trigger an AWS Step Functions state machine through Amazon CloudWatch to orchestrate the workflow. Its first step triggers a Lambda function that queries the metadata index to identify the storage layers that contain user records and generates a report that’s saved to an S3 report bucket. A Step Functions activity is created and picked up by a Lambda Node JS based worker that sends an email to the approver through Amazon Simple Email Service (SES) with approve and reject links.

The following diagram shows a graphical representation of the Step Function state machine as seen on the AWS Management Console.

The approver selects one of the two links, which then calls an Amazon API Gateway endpoint that invokes Step Functions to resume the workflow. If you choose the approve link, Step Functions triggers a Lambda function that takes the report stored in the bucket as input, deletes the objects or records from the storage layer, and updates the index metastore. When the purging job is complete, Amazon Simple Notification Service (SNS) sends a success or fail email to the user.

The following diagram represents the Step Functions flow on the console if the purge flow completed successfully.

For the complete code base, see step-function-definition.json in the GitHub repo.

Flow 3: Batch metastore update

This flow refers to the use case of an existing data lake for which index metastore needs to be created. You can orchestrate the flow through AWS Step Functions, which takes historical data as input and updates metastore through a batch job. Our current implementation doesn’t include a sample script for this user flow.

Our framework

We now walk you through the two use cases we followed for our implementation:

  • You have multiple user records stored in each Amazon S3 file
  • A user has records stored in homogenous AWS storage layers

Within these two approaches, we demonstrate alternatives that you can use to store your index metastore.

Indexing by S3 URI and row number

For this use case, we use a free tier RDS Postgres instance to store our index. We created a simple table with the following code:

CREATE UNLOGGED TABLE IF NOT EXISTS user_objects (
				userid TEXT,
				s3path TEXT,
				recordline INTEGER
			);

You can index on user_id to optimize query performance. On object upload, for each row, you need to insert into the user_objects table a row that indicates the user ID, the URI of the target Amazon S3 object, and the row that corresponds to the record. For instance, when uploading the following JSON input, enter the following code:

{"user_id":"V34qejxNsCbcgD8C0HVk-Q","body":"…"}
{"user_id":"ofKDkJKXSKZXu5xJNGiiBQ","body":"…"}
{"user_id":"UgMW8bLE0QMJDCkQ1Ax5Mg","body ":"…"}

We insert the tuples into user_objects in the Amazon S3 location s3://gdpr-demo/year=2018/month=2/day=26/input.json. See the following code:

(“V34qejxNsCbcgD8C0HVk-Q”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 0)
(“ofKDkJKXSKZXu5xJNGiiBQ”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 1)
(“UgMW8bLE0QMJDCkQ1Ax5Mg”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 2)

You can implement the index update operation by using a Lambda function triggered on any Amazon S3 ObjectCreated event.

When we get a delete request from a user, we need to query our index to get some information about where we have stored the data to delete. See the following code:

SELECT s3path,
                ARRAY_AGG(recordline)
                FROM user_objects
                WHERE userid = ‘V34qejxNsCbcgD8C0HVk-Q’
                GROUP BY;

The preceding example SQL query returns rows like the following:

(“s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json“, {2102,529})

The output indicates that lines 529 and 2102 of S3 object s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json contain the requested user’s data and need to be purged. We then need to download the object, remove those rows, and overwrite the object. For a Python implementation of the Lambda function that implements this functionality, see deleteUserRecords.py in the GitHub repo.

Having the record line available allows you to perform the deletion efficiently in byte format. For implementation simplicity, we purge the rows by replacing the deleted rows with an empty JSON object. You pay a slight storage overhead, but you don’t need to update subsequent row metadata in your index, which would be costly. To eliminate empty JSON objects, we can implement an offline vacuum and index update process.

Indexing by file name and grouping by index key

For this use case, we created a DynamoDB table to store our index. We chose DynamoDB because of its ease of use and scalability; you can use its on-demand pricing model so you don’t need to guess how many capacity units you might need. When files are uploaded to the data lake, a Lambda function parses the file name (for example, 1001-.csv) to identify the user identifier and populates the DynamoDB metadata table. Userid is the partition key, and each different storage layer has its own attribute. For example, if user 1001 had data in Amazon S3 and Amazon RDS, their records look like the following code:

{"userid:": 1001, "s3":{"s3://path1", "s3://path2"}, "RDS":{"db1.table1.column1"}}

For a sample Python implementation of this functionality, see update-dynamo-metadata.py in the GitHub repo.

On delete request, we query the metastore table, which is DynamoDB, and generate a purge report that contains details on what storage layers contain user records, and storage layer specifics that can speed up locating the records. We store the purge report to Amazon S3. For a sample Lambda function that implements this logic, see generate-purge-report.py in the GitHub repo.

After the purging is approved, we use the report as input to delete the required resources. For a sample Lambda function implementation, see gdpr-purge-data.py in the GitHub repo.

Implementation and technology alternatives

We explored and evaluated multiple implementation options, all of which present tradeoffs, such as implementation simplicity, efficiency, critical data compliance, and feature completeness:

  • Scan every record of the data file to create an index – Whenever a file is uploaded, we iterate through its records and generate tuples (userid, s3Uri, row_number) that are then inserted to our metadata storing layer. On delete request, we fetch the metadata records for requested user IDs, download the corresponding S3 objects, perform the delete in place, and re-upload the updated objects, overwriting the existing object. This is the most flexible approach because it supports a single object to store multiple users’ data, which is a very common practice. The flexibility comes at a cost because it requires downloading and re-uploading the object, which introduces a network bottleneck in delete operations. User activity datasets such as customer product reviews are a good fit for this approach, because it’s unexpected to have multiple records for the same user within each partition (such as a date partition), and it’s preferable to combine multiple users’ activity in a single file. It’s similar to what was described in the section “Indexing by S3 URI and row number” and sample code is available in the GitHub repo.
  • Store metadata as file name prefix – Adding the user ID as the prefix of the uploaded object under the different partitions that are defined based on query pattern enables you to reduce the required search operations on delete request. The metadata handling utility finds the user ID from the file name and maintains the index accordingly. This approach is efficient in locating the resources to purge but assumes a single user per object, and requires you to store user IDs within the filename, which might require InfoSec considerations. Clickstream data, where you would expect to have multiple click events for a single customer on a single date partition during a session, is a good fit. We covered this approach in the section “Indexing by file name and grouping by index key” and you can download the codebase from the GitHub repo.
  • Use a metadata file – Along with uploading a new object, we also upload a metadata file that’s picked up by an indexing utility to create and maintain the index up to date. On delete request, we query the index, which points us to the records to purge. A good fit for this approach is a use case that already involves uploading a metadata file whenever a new object is uploaded, such as uploading multimedia data, along with their metadata. Otherwise, uploading a metadata file on every object upload might introduce too much of an overhead.
  • Use the tagging feature of AWS services – Whenever a new file is uploaded to Amazon S3, we use the Put Object Tagging Amazon S3 operation to add a key-value pair for the user identifier. Whenever there is a user data delete request, it fetches objects with that tag and deletes them. This option is straightforward to implement using the existing Amazon S3 API and can therefore be a very initial version of your implementation. However, it involves significant limitations. It assumes a 1:1 cardinality between Amazon S3 objects and users (each object only contains data for a single user), searching objects based on a tag is limited and inefficient, and storing user identifiers as tags might not be compliant with your organization’s InfoSec policy.
  • Use Apache Hudi – Apache Hudi is becoming a very popular option to perform record-level data deletion on Amazon S3. Its current version is restricted to Amazon EMR, and you can use it if you start to build your data lake from scratch, because you need to store your as Hudi datasets. Hudi is a very active project and additional features and integrations with more AWS services are expected.

The key implementation decision of our approach is separating the storage layer we use for our data and the one we use for our metadata. As a result, our design is versatile and can be plugged in any existing data pipeline. Similar to deciding what storage layer to use for your data, there are many factors to consider when deciding how to store your index:

  • Concurrency of requests – If you don’t expect too many simultaneous inserts, even something as simple as Amazon S3 could be a starting point for your index. However, if you get multiple concurrent writes for multiple users, you need to look into a service that copes better with transactions.
  • Existing team knowledge and infrastructure – In this post, we demonstrated using DynamoDB and RDS Postgres for storing and querying the metadata index. If your team has no experience with either of those but are comfortable with Amazon ES, Amazon DocumentDB (with MongoDB compatibility), or any other storage layer, use those. Furthermore, if you’re already running (and paying for) a MySQL database that’s not used to capacity, you could use that for your index for no additional cost.
  • Size of index – The volume of your metadata is orders of magnitude lower than your actual data. However, if your dataset grows significantly, you might need to consider going for a scalable, distributed storage solution rather than, for instance, a relational database management system.

Conclusion

GDPR has transformed best practices and introduced several extra technical challenges in designing and implementing a data lake. The reference architecture and scripts in this post may help you delete data in a manner that’s compliant with GDPR.

Let us know your feedback in the comments and how you implemented this solution in your organization, so that others can learn from it.

 


About the Authors

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

 

 

 

 

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.