All posts by Karthik Sonti

How to retain system tables’ data spanning multiple Amazon Redshift clusters and run cross-cluster diagnostic queries

Post Syndicated from Karthik Sonti original https://aws.amazon.com/blogs/big-data/how-to-retain-system-tables-data-spanning-multiple-amazon-redshift-clusters-and-run-cross-cluster-diagnostic-queries/

Amazon Redshift is a data warehouse service that logs the history of the system in STL log tables. The STL log tables manage disk space by retaining only two to five days of log history, depending on log usage and available disk space.

To retain STL tables’ data for an extended period, you usually have to create a replica table for every system table. Then, for each you load the data from the system table into the replica at regular intervals. By maintaining replica tables for STL tables, you can run diagnostic queries on historical data from the STL tables. You then can derive insights from query execution times, query plans, and disk-spill patterns, and make better cluster-sizing decisions. However, refreshing replica tables with live data from STL tables at regular intervals requires schedulers such as Cron or AWS Data Pipeline. Also, these tables are specific to one cluster and they are not accessible after the cluster is terminated. This is especially true for transient Amazon Redshift clusters that last for only a finite period of ad hoc query execution.

In this blog post, I present a solution that exports system tables from multiple Amazon Redshift clusters into an Amazon S3 bucket. This solution is serverless, and you can schedule it as frequently as every five minutes. The AWS CloudFormation deployment template that I provide automates the solution setup in your environment. The system tables’ data in the Amazon S3 bucket is partitioned by cluster name and query execution date to enable efficient joins in cross-cluster diagnostic queries.

I also provide another CloudFormation template later in this post. This second template helps to automate the creation of tables in the AWS Glue Data Catalog for the system tables’ data stored in Amazon S3. After the system tables are exported to Amazon S3, you can run cross-cluster diagnostic queries on the system tables’ data and derive insights about query executions in each Amazon Redshift cluster. You can do this using Amazon QuickSight, Amazon Athena, Amazon EMR, or Amazon Redshift Spectrum.

You can find all the code examples in this post, including the CloudFormation templates, AWS Glue extract, transform, and load (ETL) scripts, and the resolution steps for common errors you might encounter in this GitHub repository.

Solution overview

The solution in this post uses AWS Glue to export system tables’ log data from Amazon Redshift clusters into Amazon S3. The AWS Glue ETL jobs are invoked at a scheduled interval by AWS Lambda. AWS Systems Manager, which provides secure, hierarchical storage for configuration data management and secrets management, maintains the details of Amazon Redshift clusters for which the solution is enabled. The last-fetched time stamp values for the respective cluster-table combination are maintained in an Amazon DynamoDB table.

The following diagram covers the key steps involved in this solution.

The solution as illustrated in the preceding diagram flows like this:

  1. The Lambda function, invoke_rs_stl_export_etl, is triggered at regular intervals, as controlled by Amazon CloudWatch. It’s triggered to look up the AWS Systems Manager parameter store to get the details of the Amazon Redshift clusters for which the system table export is enabled.
  2. The same Lambda function, based on the Amazon Redshift cluster details obtained in step 1, invokes the AWS Glue ETL job designated for the Amazon Redshift cluster. If an ETL job for the cluster is not found, the Lambda function creates one.
  3. The ETL job invoked for the Amazon Redshift cluster gets the cluster credentials from the parameter store. It gets from the DynamoDB table the last exported time stamp of when each of the system tables was exported from the respective Amazon Redshift cluster.
  4. The ETL job unloads the system tables’ data from the Amazon Redshift cluster into an Amazon S3 bucket.
  5. The ETL job updates the DynamoDB table with the last exported time stamp value for each system table exported from the Amazon Redshift cluster.
  6. The Amazon Redshift cluster system tables’ data is available in Amazon S3 and is partitioned by cluster name and date for running cross-cluster diagnostic queries.

Understanding the configuration data

This solution uses AWS Systems Manager parameter store to store the Amazon Redshift cluster credentials securely. The parameter store also securely stores other configuration information that the AWS Glue ETL job needs for extracting and storing system tables’ data in Amazon S3. Systems Manager comes with a default AWS Key Management Service (AWS KMS) key that it uses to encrypt the password component of the Amazon Redshift cluster credentials.

The following table explains the global parameters and cluster-specific parameters required in this solution. The global parameters are defined once and applicable at the overall solution level. The cluster-specific parameters are specific to an Amazon Redshift cluster and repeat for each cluster for which you enable this post’s solution. The CloudFormation template explained later in this post creates these parameters as part of the deployment process.

Parameter nameTypeDescription
Global parametersdefined once and applied to all jobs
redshift_query_logs.global.s3_prefixStringThe Amazon S3 path where the query logs are exported. Under this path, each exported table is partitioned by cluster name and date.
redshift_query_logs.global.tempdirStringThe Amazon S3 path that AWS Glue ETL jobs use for temporarily staging the data.
redshift_query_logs.global.role>StringThe name of the role that the AWS Glue ETL jobs assume. Just the role name is sufficient. The complete Amazon Resource Name (ARN) is not required.
redshift_query_logs.global.enabled_cluster_listStringListA comma-separated list of cluster names for which system tables’ data export is enabled. This gives flexibility for a user to exclude certain clusters.
Cluster-specific parametersfor each cluster specified in the enabled_cluster_list parameter
redshift_query_logs.<<cluster_name>>.connectionStringThe name of the AWS Glue Data Catalog connection to the Amazon Redshift cluster. For example, if the cluster name is product_warehouse, the entry is redshift_query_logs.product_warehouse.connection.
redshift_query_logs.<<cluster_name>>.userStringThe user name that AWS Glue uses to connect to the Amazon Redshift cluster.
redshift_query_logs.<<cluster_name>>.passwordSecure StringThe password that AWS Glue uses to connect the Amazon Redshift cluster’s encrypted-by key that is managed in AWS KMS.

For example, suppose that you have two Amazon Redshift clusters, product-warehouse and category-management, for which the solution described in this post is enabled. In this case, the parameters shown in the following screenshot are created by the solution deployment CloudFormation template in the AWS Systems Manager parameter store.

Solution deployment

To make it easier for you to get started, I created a CloudFormation template that automatically configures and deploys the solution—only one step is required after deployment.

Prerequisites

To deploy the solution, you must have one or more Amazon Redshift clusters in a private subnet. This subnet must have a network address translation (NAT) gateway or a NAT instance configured, and also a security group with a self-referencing inbound rule for all TCP ports. For more information about why AWS Glue ETL needs the configuration it does, described previously, see Connecting to a JDBC Data Store in a VPC in the AWS Glue documentation.

To start the deployment, launch the CloudFormation template:

CloudFormation stack parameters

The following table lists and describes the parameters for deploying the solution to export query logs from multiple Amazon Redshift clusters.

PropertyDefaultDescription
S3BucketmybucketThe bucket this solution uses to store the exported query logs, stage code artifacts, and perform unloads from Amazon Redshift. For example, the mybucket/extract_rs_logs/data bucket is used for storing all the exported query logs for each system table partitioned by the cluster. The mybucket/extract_rs_logs/temp/ bucket is used for temporarily staging the unloaded data from Amazon Redshift. The mybucket/extract_rs_logs/code bucket is used for storing all the code artifacts required for Lambda and the AWS Glue ETL jobs.
ExportEnabledRedshiftClustersRequires InputA comma-separated list of cluster names from which the system table logs need to be exported.
DataStoreSecurityGroupsRequires InputA list of security groups with an inbound rule to the Amazon Redshift clusters provided in the parameter, ExportEnabledClusters. These security groups should also have a self-referencing inbound rule on all TCP ports, as explained on Connecting to a JDBC Data Store in a VPC.

After you launch the template and create the stack, you see that the following resources have been created:

  1. AWS Glue connections for each Amazon Redshift cluster you provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  2. All parameters required for this solution created in the parameter store.
  3. The Lambda function that invokes the AWS Glue ETL jobs for each configured Amazon Redshift cluster at a regular interval of five minutes.
  4. The DynamoDB table that captures the last exported time stamps for each exported cluster-table combination.
  5. The AWS Glue ETL jobs to export query logs from each Amazon Redshift cluster provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  6. The IAM roles and policies required for the Lambda function and AWS Glue ETL jobs.

After the deployment

For each Amazon Redshift cluster for which you enabled the solution through the CloudFormation stack parameter, ExportEnabledRedshiftClusters, the automated deployment includes temporary credentials that you must update after the deployment:

  1. Go to the parameter store.
  2. Note the parameters <<cluster_name>>.user and redshift_query_logs.<<cluster_name>>.password that correspond to each Amazon Redshift cluster for which you enabled this solution. Edit these parameters to replace the placeholder values with the right credentials.

For example, if product-warehouse is one of the clusters for which you enabled system table export, you edit these two parameters with the right user name and password and choose Save parameter.

Querying the exported system tables

Within a few minutes after the solution deployment, you should see Amazon Redshift query logs being exported to the Amazon S3 location, <<S3Bucket_you_provided>>/extract_redshift_query_logs/data/. In that bucket, you should see the eight system tables partitioned by customer name and date: stl_alert_event_log, stl_dlltext, stl_explain, stl_query, stl_querytext, stl_scan, stl_utilitytext, and stl_wlm_query.

To run cross-cluster diagnostic queries on the exported system tables, create external tables in the AWS Glue Data Catalog. To make it easier for you to get started, I provide a CloudFormation template that creates an AWS Glue crawler, which crawls the exported system tables stored in Amazon S3 and builds the external tables in the AWS Glue Data Catalog.

Launch this CloudFormation template to create external tables that correspond to the Amazon Redshift system tables. S3Bucket is the only input parameter required for this stack deployment. Provide the same Amazon S3 bucket name where the system tables’ data is being exported. After you successfully create the stack, you can see the eight tables in the database, redshift_query_logs_db, as shown in the following screenshot.

Now, navigate to the Athena console to run cross-cluster diagnostic queries. The following screenshot shows a diagnostic query executed in Athena that retrieves query alerts logged across multiple Amazon Redshift clusters.

You can build the following example Amazon QuickSight dashboard by running cross-cluster diagnostic queries on Athena to identify the hourly query count and the key query alert events across multiple Amazon Redshift clusters.

How to extend the solution

You can extend this post’s solution in two ways:

  • Add any new Amazon Redshift clusters that you spin up after you deploy the solution.
  • Add other system tables or custom query results to the list of exports from an Amazon Redshift cluster.

Extend the solution to other Amazon Redshift clusters

To extend the solution to more Amazon Redshift clusters, add the three cluster-specific parameters in the AWS Systems Manager parameter store following the guidelines earlier in this post. Modify the redshift_query_logs.global.enabled_cluster_list parameter to append the new cluster to the comma-separated string.

Extend the solution to add other tables or custom queries to an Amazon Redshift cluster

The current solution ships with the export functionality for the following Amazon Redshift system tables:

  • stl_alert_event_log
  • stl_dlltext
  • stl_explain
  • stl_query
  • stl_querytext
  • stl_scan
  • stl_utilitytext
  • stl_wlm_query

You can easily add another system table or custom query by adding a few lines of code to the AWS Glue ETL job, <<cluster-name>_extract_rs_query_logs. For example, suppose that from the product-warehouse Amazon Redshift cluster you want to export orders greater than $2,000. To do so, add the following five lines of code to the AWS Glue ETL job product-warehouse_extract_rs_query_logs, where product-warehouse is your cluster name:

  1. Get the last-processed time-stamp value. The function creates a value if it doesn’t already exist.

salesLastProcessTSValue = functions.getLastProcessedTSValue(trackingEntry=”mydb.sales_2000",job_configs=job_configs)

  1. Run the custom query with the time stamp.

returnDF=functions.runQuery(query="select * from sales s join order o where o.order_amnt > 2000 and sale_timestamp > '{}'".format (salesLastProcessTSValue) ,tableName="mydb.sales_2000",job_configs=job_configs)

  1. Save the results to Amazon S3.

functions.saveToS3(dataframe=returnDF,s3Prefix=s3Prefix,tableName="mydb.sales_2000",partitionColumns=["sale_date"],job_configs=job_configs)

  1. Get the latest time-stamp value from the returned data frame in Step 2.

latestTimestampVal=functions.getMaxValue(returnDF,"sale_timestamp",job_configs)

  1. Update the last-processed time-stamp value in the DynamoDB table.

functions.updateLastProcessedTSValue(“mydb.sales_2000",latestTimestampVal[0],job_configs)

Conclusion

In this post, I demonstrate a serverless solution to retain the system tables’ log data across multiple Amazon Redshift clusters. By using this solution, you can incrementally export the data from system tables into Amazon S3. By performing this export, you can build cross-cluster diagnostic queries, build audit dashboards, and derive insights into capacity planning by using services such as Athena. I also demonstrate how you can extend this solution to other ad hoc query use cases or tables other than system tables by adding a few lines of code.


Additional Reading

If you found this post useful, be sure to check out Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production and Amazon Redshift – 2017 Recap.


About the Author

Karthik Sonti is a senior big data architect at Amazon Web Services. He helps AWS customers build big data and analytical solutions and provides guidance on architecture and best practices.

 

 

 

 

Building Event-Driven Batch Analytics on AWS

Post Syndicated from Karthik Sonti original https://aws.amazon.com/blogs/big-data/building-event-driven-batch-analytics-on-aws/

Karthik Sonti is a Senior Big Data Architect with AWS Professional Services

Modern businesses typically collect data from internal and external sources at various frequencies throughout the day. These data sources could be franchise stores, subsidiaries, or new systems integrated as a result of merger and acquisitions.

For example, a retail chain might collect point-of-sale (POS) data from all franchise stores three times a day to get insights into sales as well as to identify the right number of staff at a given time in any given store. As each franchise functions as an independent business, the format and structure of the data might not be consistent across the board. Depending on the geographical region, each franchise would provide data at a different frequency and the analysis of these datasets should wait until all the required data is provided (event-driven) from the individual franchises. In most cases, the individual data volumes received from each franchise are usually small but the velocity of the data being generated and the collective volume can be challenging to manage.

In this post, I walk you through an architectural approach as well as a sample implementation on how to collect, process, and analyze data for event-driven applications in AWS.

Architecture

The architecture diagram below depicts the components and the data flow needed for a event-driven batch analytics system. At a high-level, this architecture approach leverages Amazon S3 for storing source, intermediate, and final output data; AWS Lambda for intermediate file level ETL and state management; Amazon RDS as the state persistent store; Amazon EMR for aggregated ETL (heavy lifting, consolidated transformation, and loading engine); and Amazon Redshift as the data warehouse hosting data needed for reporting.

In this architecture, each location on S3 stores data at a certain state of transformation. When new data is placed at a specific location, an S3 event is raised that triggers a Lambda function responsible for the next transformation in the chain. You can use this event-driven approach to create sophisticated ETL processes, and to syndicate data availability at a given point in the chain.

EventDriven_Image1_o

Data staging layer

S3, being an object store, inherently provides support for a  wide variety of data formats (structured, unstructured) and also enables schema on read capability (you can defer assigning a structure or model to the data until you are ready to consume it). That makes S3 a great place to land the data from various source systems.

From an enterprise perspective, S3 could act as your data lake layer for storing data in the source-identical format for any future deep analytics purposes. S3 lets you organize your data with prefixes, which can be used to separate the validated/converted data from source-identical data. S3 also allows you to configure events that invoke specific Lambda functions configured in  the input validation/conversion layer and input tracking layer. The idea of breaking the problem up into smaller components starts right here.

Input validation/ conversion layer

The Lambda function in this layer is used for validating ingested files from sources. Validation, in some use cases, could mean checking for specific attributes, and in others it could be to convert data from the source format to a format that the subsequent EMR job expects. Common examples are removing unwanted characters or attributes from the input files, converting a legacy file such as .dbf to a modern file format .json, or decompressing a zip to individual files.

Note that the Lambda function in this layer acts on a file level and writes the output to a different location in the same bucket. If your use case involves complex file conversions, consider breaking this layer into concise steps, each backed by their own Lambda function adhering to the enforced limits. This layer can be omitted if no file-level conversions or validations are needed.

Input tracking layer

In many event-driven batch analytics use cases, the final step to consolidate and process data cannot be invoked unless certain dependencies are resolved. Those dependencies could be inter-file dependencies such as orders data depending on Item data or volume-based dependencies such as waiting until a sizeable amount of data is received before submitting an aggregation job.

To enable resolving such dependencies, the first step is to audit which file is being received at what “state”: Validated, Converted, etc. The number of “states” varies from use case to use case. As soon as the input validation/conversion layer Lambda function places its output into the S3 bucket at a designated location, the same S3 eventing framework is leveraged to trigger the Lambda function configured in this layer, to track those states into a  state management store so that the subseqeuent layers can read the store and verify that the configured dependencies are resolved before submitting a EMR job.

State management store

As the name indicates, the persistent storage in this layer stores the state information of files received from data sources and the aggregation jobs configured to run on those files. The state management store is central to this architecture approach, as it helps control the criteria for submitting an EMR aggregation job, its input size and input type, along with other parameters.

The state management store provides a single view of input files that have been received, the list of jobs currently running on those files, and the information on the states (Running/ Completed/ Failed) of those jobs.

While RDS and Amazon DynamoDB are equally good choices for building a state management store, RDS provides the flexibility of using SQL commands to edit the EMR aggregation job configurations and easily tracking the status of a job and its impacts. A sample state management store is modelled in MySQL for the use case discussed later.

Aggregation job submission layer

The Lambda function in this layer iterates over the list of open jobs (jobs that are not in “Running” state) in the state management store and submits a job if the respective precondition criteria configured for that job are met.

EventDriven_Image2_o

In addition to a generic precondition to not submit the job unless an updated version of input files are received after the last time the job was launched, specific preconditions for each job are verified by the Lambda function configured in this layer, such as minimum file count, existence of reference data, etc.

After an EMR job is submitted, the Lambda function in this layer updates the state management store with the respective clusterId:stepId combination to enable the Job monitoring layer update job status. The implementation I provide below for the use case example implements all the aforementioned preconditions.

The Lambda function in this layer is scheduled to run at a specific interval, which is a function of the batch frequency of the input sources and desired frequency of the aggregation jobs. Chosing the right schedule interval prevents concurrent job submissions.

Aggregation job monitoring layer

The Lambda function in this layer iterates over the submitted jobs and updates the status of that job from “Running” to either “Completed” or “Failed”. The Lambda function in this layer is scheduled to run at a specific interval, which is a function of the average time that your aggregation jobs run.

Aggregation and load layer

This layer does the bulk of the heavy lifting of data transformation, aggregation, and loading into Amazon Redshift. Depending on the requirement for how frequently transformation and loading to Amazon Redshift should happen, you can either have a long-running cluster with jobs submitted as steps or have a new cluster instantiated every time a job has to be submitted.

Also depending on the volume of updates received from input sources, it might be efficient to just add the changed records rather than truncate and reload the whole dataset. The output from the aggregation job is stored in the Amazon Redshift data warehouse.

Exception handling and tracking

The Lambda functions across all the layers retry three times in any error scenario. Even after the retries, if a Lambda function fails, Amazon CloudWatch provides metrics and logs to help monitor and troubleshoot a Lambda function.

In an EMR step failure scenario, logs are maintained in S3 and can be accessed from the AWS Management Console. For more information, see Hadoop Resource Manager/Application Master UI.

Sample use case: Yummy Foods

Here’s a sample use case for implementing the event-driven batch analytics framework discussed so far. Yummy Foods, a hypothetical customer, has franchise stores all over the country. These franchise stores run on heterogeneous platforms and they submit cumulative transaction files to Yummy Foods at various cadence levels throughout the day in tab-delimited (.tdf) format. Some of these franchise stores, due to their system limitations, occasionally send additional data starting with characters such as “—-“.

Yummy Foods need to be able to update insights on the sales made by each franchise for a given item throughout the day as soon as the complete list of franchise files from a given province are available. The number of franchises per province is fixed and seldom changes.

The aggregation job for a given province should not be submitted until the configured number of franchise store files from that province are available and also until the product master data update is posted at the beginning of the day. A master data update is identified by the presence of at least one “Item.csv” file for that day.

The aggregation job should consider only transaction codes 4 (sale amount) , 5 (tax amount) and 6 (discount amount). The rest of the codes can be ignored. After the aggregation job is completed, only one record should exist for a given combination of franchise store, item, and transaction date.

Implementation

Complete end-to-end code Java implementation for the above use case is available from the aws-blog-event-driven-batch-analytics GitHub repo. Follow the instructions in the Setup.md file to set up and execute the code. The high-level cost estimate for the infrastructure to test this code is around $1.10 per hour.

The “Input Validation/ Conversion “ layer eliminates any bad data in the input files and converts the tab delimited .tdf files to .csv files.

The “State Management Store” is modelled to be able to store ingested file status (INGESTEDFILESTATUS) and also the job configurations (AGGRJOBCONFIGURATION) with preconditions  such as waiting until all the fixed number of vendor files are received for a province and verifying that the item master data is posted.

The “Input Tracking” layer records the last validated timestamp of the input file in the file status table (INGESTEDFILESTATUS) within the “State Management Store”.

The “Aggregation Job Submisssion” layer submits a job when the preconditions configured for a job in the “State Management Store” are satisfied.

The “Aggregation and Load layer” EMR Spark job, based on the input parameter, processes and aggregates the vendor transaction data and updates the Amazon Redshift data warehouse.

The “Aggregation Job Monitoring” layer at a scheduled interval updates the active “Running” job status to either “Completed” or “Failed” for tracking purposes.

To avoid future charges, terminate the RDS instance, Amazon Redshift cluster, and EMR cluster provisioned as part of testing this code.

Conclusion

I’ve walked through an elastic and modular architecture approach that you can follow to build event-driven batch analytics on AWS. Next time, when you encounter ETL use cases that involves a series of steps in cleansing, standardization, and consolidation of data received from a wide variety of internal and external sources at fixed intervals, consider leveraging the approach described in this post.

This approach provides the following benefits:

  1. Divides the problem into small, decoupled components, with each component having the ability to fire off independently (useful in “replay” scenarios).
  2. Scales out or in dynamically with incoming loads.
  3. Lets you control when and how your EMR processing job should be triggered.
  4. Lets you track your event-driven batch analytic jobs and the respective input files
  5. Incurs costs for only the processing used rather than for fixed software and hardware infrastructure.

If you have questions or suggestions, please comment below.

—————————–

Related

Using AWS Lambda for Event-driven Data Processing Pipelines

Lambda_Pipeline_Image_1

Building Event-Driven Batch Analytics on AWS

Post Syndicated from Karthik Sonti original https://blogs.aws.amazon.com/bigdata/post/TxYWG7G831O94M/Building-Event-Driven-Batch-Analytics-on-AWS

Karthik Sonti is a Senior Big Data Architect with AWS Professional Services

Modern businesses typically collect data from internal and external sources at various frequencies throughout the day. These data sources could be franchise stores, subsidiaries, or new systems integrated as a result of merger and acquisitions.

For example, a retail chain might collect point-of-sale (POS) data from all franchise stores three times a day to get insights into sales as well as to identify the right number of staff at a given time in any given store. As each franchise functions as an independent business, the format and structure of the data might not be consistent across the board. Depending on the geographical region, each franchise would provide data at a different frequency and the analysis of these datasets should wait until all the required data is provided (event-driven) from the individual franchises. In most cases, the individual data volumes received from each franchise are usually small but the velocity of the data being generated and the collective volume can be challenging to manage.

In this post, I walk you through an architectural approach as well as a sample implementation on how to collect, process, and analyze data for event-driven applications in AWS.

Architecture

The architecture diagram below depicts the components and the data flow needed for a event-driven batch analytics system. At a high-level, this architecture approach leverages Amazon S3 for storing source, intermediate, and final output data; AWS Lambda for intermediate file level ETL and state management; Amazon RDS as the state persistent store; Amazon EMR for aggregated ETL (heavy lifting, consolidated transformation, and loading engine); and Amazon Redshift as the data warehouse hosting data needed for reporting.

In this architecture, each location on S3 stores data at a certain state of transformation. When new data is placed at a specific location, an S3 event is raised that triggers a Lambda function responsible for the next transformation in the chain. You can use this event-driven approach to create sophisticated ETL processes, and to syndicate data availability at a given point in the chain.

Data staging layer

S3, being an object store, inherently provides support for a  wide variety of data formats (structured, unstructured) and also enables schema on read capability (you can defer assigning a structure or model to the data until you are ready to consume it). That makes S3 a great place to land the data from various source systems.

From an enterprise perspective, S3 could act as your data lake layer for storing data in the source-identical format for any future deep analytics purposes. S3 lets you organize your data with prefixes, which can be used to separate the validated/converted data from source-identical data. S3 also allows you to configure events that invoke specific Lambda functions configured in  the input validation/conversion layer and input tracking layer. The idea of breaking the problem up into smaller components starts right here.

Input validation/ conversion layer

The Lambda function in this layer is used for validating ingested files from sources. Validation, in some use cases, could mean checking for specific attributes, and in others it could be to convert data from the source format to a format that the subsequent EMR job expects. Common examples are removing unwanted characters or attributes from the input files, converting a legacy file such as .dbf to a modern file format .json, or decompressing a zip to individual files.

Note that the Lambda function in this layer acts on a file level and writes the output to a different location in the same bucket. If your use case involves complex file conversions, consider breaking this layer into concise steps, each backed by their own Lambda function adhering to the enforced limits. This layer can be omitted if no file-level conversions or validations are needed.

Input tracking layer

In many event-driven batch analytics use cases, the final step to consolidate and process data cannot be invoked unless certain dependencies are resolved. Those dependencies could be inter-file dependencies such as orders data depending on Item data or volume-based dependencies such as waiting until a sizeable amount of data is received before submitting an aggregation job.

To enable resolving such dependencies, the first step is to audit which file is being received at what “state”: Validated, Converted, etc. The number of “states” varies from use case to use case. As soon as the input validation/conversion layer Lambda function places its output into the S3 bucket at a designated location, the same S3 eventing framework is leveraged to trigger the Lambda function configured in this layer, to track those states into a  state management store so that the subseqeuent layers can read the store and verify that the configured dependencies are resolved before submitting a EMR job. 

State management store

As the name indicates, the persistent storage in this layer stores the state information of files received from data sources and the aggregation jobs configured to run on those files. The state management store is central to this architecture approach, as it helps control the criteria for submitting an EMR aggregation job, its input size and input type, along with other parameters.

The state management store provides a single view of input files that have been received, the list of jobs currently running on those files, and the information on the states (Running/ Completed/ Failed) of those jobs.

While RDS and Amazon DynamoDB are equally good choices for building a state management store, RDS provides the flexibility of using SQL commands to edit the EMR aggregation job configurations and easily tracking the status of a job and its impacts. A sample state management store is modelled in MySQL for the use case discussed later.

Aggregation job submission layer

The Lambda function in this layer iterates over the list of open jobs (jobs that are not in “Running” state) in the state management store and submits a job if the respective precondition criteria configured for that job are met. 

In addition to a generic precondition to not submit the job unless an updated version of input files are received after the last time the job was launched, specific preconditions for each job are verified by the Lambda function configured in this layer, such as minimum file count, existence of reference data, etc.  

After an EMR job is submitted, the Lambda function in this layer updates the state management store with the respective clusterId:stepId combination to enable the Job monitoring layer update job status. The implementation I provide below for the use case example implements all the aforementioned preconditions.

The Lambda function in this layer is scheduled to run at a specific interval, which is a function of the batch frequency of the input sources and desired frequency of the aggregation jobs. Chosing the right schedule interval prevents concurrent job submissions.

Aggregation job monitoring layer

The Lambda function in this layer iterates over the submitted jobs and updates the status of that job from “Running” to either “Completed” or “Failed”. The Lambda function in this layer is scheduled to run at a specific interval, which is a function of the average time that your aggregation jobs run.

Aggregation and load layer

This layer does the bulk of the heavy lifting of data transformation, aggregation, and loading into Amazon Redshift. Depending on the requirement for how frequently transformation and loading to Amazon Redshift should happen, you can either have a long-running cluster with jobs submitted as steps or have a new cluster instantiated every time a job has to be submitted.

Also depending on the volume of updates received from input sources, it might be efficient to just add the changed records rather than truncate and reload the whole dataset. The output from the aggregation job is stored in the Amazon Redshift data warehouse. 

Exception handling and tracking

The Lambda functions across all the layers retry three times in any error scenario. Even after the retries, if a Lambda function fails, Amazon CloudWatch provides metrics and logs to help monitor and troubleshoot a Lambda function.

In an EMR step failure scenario, logs are maintained in S3 and can be accessed from the AWS Management Console. For more information, see Hadoop Resource Manager/Application Master UI.

Sample use case: Yummy Foods

Here’s a sample use case for implementing the event-driven batch analytics framework discussed so far. Yummy Foods, a hypothetical customer, has franchise stores all over the country. These franchise stores run on heterogeneous platforms and they submit cumulative transaction files to Yummy Foods at various cadence levels throughout the day in tab-delimited (.tdf) format. Some of these franchise stores, due to their system limitations, occasionally send additional data starting with characters such as “—-“.

Yummy Foods need to be able to update insights on the sales made by each franchise for a given item throughout the day as soon as the complete list of franchise files from a given province are available. The number of franchises per province is fixed and seldom changes.

The aggregation job for a given province should not be submitted until the configured number of franchise store files from that province are available and also until the product master data update is posted at the beginning of the day. A master data update is identified by the presence of at least one “Item.csv” file for that day.

The aggregation job should consider only transaction codes 4 (sale amount) , 5 (tax amount) and 6 (discount amount). The rest of the codes can be ignored. After the aggregation job is completed, only one record should exist for a given combination of franchise store, item, and transaction date.

Implementation

Complete end-to-end code Java implementation for the above use case is available from the aws-blog-event-driven-batch-analytics GitHub repo. Follow the instructions in the Setup.md file to set up and execute the code. The high-level cost estimate for the infrastructure to test this code is around $1.10 per hour.

The “Input Validation/ Conversion “ layer eliminates any bad data in the input files and converts the tab delimited .tdf files to .csv files.

The “State Management Store” is modelled to be able to store ingested file status (INGESTEDFILESTATUS) and also the job configurations (AGGRJOBCONFIGURATION) with preconditions  such as waiting until all the fixed number of vendor files are received for a province and verifying that the item master data is posted. 

The “Input Tracking” layer records the last validated timestamp of the input file in the file status table (INGESTEDFILESTATUS) within the “State Management Store”.

The “Aggregation Job Submisssion” layer submits a job when the preconditions configured for a job in the “State Management Store” are satisfied.

The “Aggregation and Load layer” EMR Spark job, based on the input parameter, processes and aggregates the vendor transaction data and updates the Amazon Redshift data warehouse.

The “Aggregation Job Monitoring” layer at a scheduled interval updates the active “Running” job status to either “Completed” or “Failed” for tracking purposes.

To avoid future charges, terminate the RDS instance, Amazon Redshift cluster, and EMR cluster provisioned as part of testing this code.

Conclusion

I’ve walked through an elastic and modular architecture approach that you can follow to build event-driven batch analytics on AWS. Next time, when you encounter ETL use cases that involves a series of steps in cleansing, standardization, and consolidation of data received from a wide variety of internal and external sources at fixed intervals, consider leveraging the approach described in this post.

This approach provides the following benefits:

  1. Divides the problem into small, decoupled components, with each component having the ability to fire off independently (useful in “replay” scenarios).
  2. Scales out or in dynamically with incoming loads.
  3. Lets you control when and how your EMR processing job should be triggered.
  4. Lets you track your event-driven batch analytic jobs and the respective input files
  5. Incurs costs for only the processing used rather than for fixed software and hardware infrastructure. 

If you have questions or suggestions, please comment below.

—————————–

Related

Using AWS Lambda for Event-driven Data Processing Pipelines