Build efficient ETL pipelines with AWS Step Functions distributed map and redrive feature

Post Syndicated from Sriharsh Adari original https://aws.amazon.com/blogs/big-data/build-efficient-etl-pipelines-with-aws-step-functions-distributed-map-and-redrive-feature/

AWS Step Functions is a fully managed visual workflow service that enables you to build complex data processing pipelines involving a diverse set of extract, transform, and load (ETL) technologies such as AWS Glue, Amazon EMR, and Amazon Redshift. You can visually build the workflow by wiring individual data pipeline tasks and configuring payloads, retries, and error handling with minimal code.

While Step Functions supports automatic retries and error handling when data pipeline tasks fail due to momentary or transient errors, there can be permanent failures such as incorrect permissions, invalid data, and business logic failure during the pipeline run. This requires you to identify the issue in the step, fix the issue and restart the workflow. Previously, to rerun the failed step, you needed to restart the entire workflow from the very beginning. This leads to delays in completing the workflow, especially if it’s a complex, long-running ETL pipeline. If the pipeline has many steps using map and parallel states, this also leads to increased cost due to increases in the state transition for running the pipeline from the beginning.

Step Functions now supports the ability for you to redrive your workflow from a failed, aborted, or timed-out state so you can complete workflows faster and at a lower cost, and spend more time delivering business value. Now you can recover from unhandled failures faster by redriving failed workflow runs, after downstream issues are resolved, using the same input provided to the failed state.

In this post, we show you an ETL pipeline job that exports data from Amazon Relational Database Service (Amazon RDS) tables using the Step Functions distributed map state. Then we simulate a failure and demonstrate how to use the new redrive feature to restart the failed task from the point of failure.

Solution overview

One of the common functionalities involved in data pipelines is extracting data from multiple data sources and exporting it to a data lake or synchronizing the data to another database. You can use the Step Functions distributed map state to run hundreds of such export or synchronization jobs in parallel. Distributed map can read millions of objects from Amazon Simple Storage Service (Amazon S3) or millions of records from a single S3 object, and distribute the records to downstream steps. Step Functions runs the steps within the distributed map as child workflows at a maximum parallelism of 10,000. A concurrency of 10,000 is well above the concurrency supported by many other AWS services such as AWS Glue, which has a soft limit of 1,000 job runs per job.

The sample data pipeline sources product catalog data from Amazon DynamoDB and customer order data from Amazon RDS for PostgreSQL database. The data is then cleansed, transformed, and uploaded to Amazon S3 for further processing. The data pipeline starts with an AWS Glue crawler to create the Data Catalog for the RDS database. Because starting an AWS Glue crawler is asynchronous, the pipeline has a wait loop to check if the crawler is complete. After the AWS Glue crawler is complete, the pipeline extracts data from the DynamoDB table and RDS tables. Because these two steps are independent, they are run as parallel steps: one using an AWS Lambda function to export, transform, and load the data from DynamoDB to an S3 bucket, and the other using a distributed map with AWS Glue job sync integration to do the same from the RDS tables to an S3 bucket. Note that AWS Identity and Access Management (IAM) permissions are required for invoking an AWS Glue job from Step Functions. For more information, refer to IAM Policies for invoking AWS Glue job from Step Functions.

The following diagram illustrates the Step Functions workflow.

There are multiple tables related to customers and order data in the RDS database. Amazon S3 hosts the metadata of all the tables as a .csv file. The pipeline uses the Step Functions distributed map to read the table metadata from Amazon S3, iterate on every single item, and call the downstream AWS Glue job in parallel to export the data. See the following code:

"States": {
            "Map": {
              "Type": "Map",
              "ItemProcessor": {
                "ProcessorConfig": {
                  "Mode": "DISTRIBUTED",
                  "ExecutionType": "STANDARD"
                },
                "StartAt": "Export data for a table",
                "States": {
                  "Export data for a table": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::glue:startJobRun.sync",
                    "Parameters": {
                      "JobName": "ExportTableData",
                      "Arguments": {
                        "--dbtable.$": "$.tables"
                      }
                    },
                    "End": true
                  }
                }
              },
              "Label": "Map",
              "ItemReader": {
                "Resource": "arn:aws:states:::s3:getObject",
                "ReaderConfig": {
                  "InputType": "CSV",
                  "CSVHeaderLocation": "FIRST_ROW"
                },
                "Parameters": {
                  "Bucket": "123456789012-stepfunction-redrive",
                  "Key": "tables.csv"
                }
              },
              "ResultPath": null,
              "End": true
            }
          }

Prerequisites

To deploy the solution, you need the following prerequisites:

Launch the CloudFormation template

Complete the following steps to deploy the solution resources using AWS CloudFormation:

  1. Choose Launch Stack to launch the CloudFormation stack:
  2. Enter a stack name.
  3. Select all the check boxes under Capabilities and transforms.
  4. Choose Create stack.

The CloudFormation template creates many resources, including the following:

  • The data pipeline described earlier as a Step Functions workflow
  • An S3 bucket to store the exported data and the metadata of the tables in Amazon RDS
  • A product catalog table in DynamoDB
  • An RDS for PostgreSQL database instance with pre-loaded tables
  • An AWS Glue crawler that crawls the RDS table and creates an AWS Glue Data Catalog
  • A parameterized AWS Glue job to export data from the RDS table to an S3 bucket
  • A Lambda function to export data from DynamoDB to an S3 bucket

Simulate the failure

Complete the following steps to test the solution:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose the workflow named ETL_Process.
  3. Run the workflow with default input.

Within a few seconds, the workflow fails at the distributed map state.

You can inspect the map run errors by accessing the Step Functions workflow execution events for map runs and child workflows. In this example, you can identity the exception is due to Glue.ConcurrentRunsExceededException from AWS Glue. The error indicates there are more concurrent requests to run an AWS Glue job than are configured. Distributed map reads the table metadata from Amazon S3 and invokes as many AWS Glue jobs as the number of rows in the .csv file, but AWS Glue job is set with the concurrency of 3 when it is created. This resulted in the child workflow failure, cascading the failure to the distributed map state and then the parallel state. The other step in the parallel state to fetch the DynamoDB table ran successfully. If any step in the parallel state fails, the whole state fails, as seen with the cascading failure.

Handle failures with distributed map

By default, when a state reports an error, Step Functions causes the workflow to fail. There are multiple ways you can handle this failure with distributed map state:

  • Step Functions enables you to catch errors, retry errors, and fail back to another state to handle errors gracefully. See the following code:
    Retry": [
                          {
                            "ErrorEquals": [
                              "Glue.ConcurrentRunsExceededException "
                            ],
                            "BackoffRate": 20,
                            "IntervalSeconds": 10,
                            "MaxAttempts": 3,
                            "Comment": "Exception",
                            "JitterStrategy": "FULL"
                          }
                        ]
    

  • Sometimes, businesses can tolerate failures. This is especially true when you are processing millions of items and you expect data quality issues in the dataset. By default, when an iteration of map state fails, all other iterations are aborted. With distributed map, you can specify the maximum number of, or percentage of, failed items as a failure threshold. If the failure is within the tolerable level, the distributed map doesn’t fail.
  • The distributed map state allows you to control the concurrency of the child workflows. You can set the concurrency to map it to the AWS Glue job concurrency. Remember, this concurrency is applicable only at the workflow execution level—not across workflow executions.
  • You can redrive the failed state from the point of failure after fixing the root cause of the error.

Redrive the failed state

The root cause of the issue in the sample solution is the AWS Glue job concurrency. To address this by redriving the failed state, complete the following steps:

  1. On the AWS Glue console, navigate to the job named ExportsTableData.
  2. On the Job details tab, under Advanced properties, update Maximum concurrency to 5.

With the launch of redrive feature, You can use redrive to restart executions of standard workflows that didn’t complete successfully in the last 14 days. These include failed, aborted, or timed-out runs. You can only redrive a failed workflow from the step where it failed using the same input as the last non-successful state. You can’t redrive a failed workflow using a state machine definition that is different from the initial workflow execution. After the failed state is redriven successfully, Step Functions runs all the downstream tasks automatically. To learn more about how distributed map redrive works, refer to Redriving Map Runs.

Because the distributed map runs the steps inside the map as child workflows, the workflow IAM execution role needs permission to redrive the map run to restart the distributed map state:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "states:RedriveExecution"
      ],
      "Resource": "arn:aws:states:us-east-2:123456789012:execution:myStateMachine/myMapRunLabel:*"
    }
  ]
}

You can redrive a workflow from its failed step programmatically, via the AWS Command Line Interface (AWS CLI) or AWS SDK, or using the Step Functions console, which provides a visual operator experience.

  1. On the Step Functions console, navigate to the failed workflow you want to redrive.
  2. On the Details tab, choose Redrive from failure.

The pipeline now runs successfully because there is enough concurrency to run the AWS Glue jobs.

To redrive a workflow programmatically from its point of failure, call the new Redrive Execution API action. The same workflow starts from the last non-successful state and uses the same input as the last non-successful state from the initial failed workflow. The state to redrive from the workflow definition and the previous input are immutable.

Note the following regarding different types of child workflows:

  • Redrive for express child workflows – For failed child workflows that are express workflows within a distributed map, the redrive capability ensures a seamless restart from the beginning of the child workflow. This allows you to resolve issues that are specific to individual iterations without restarting the entire map.
  • Redrive for standard child workflows – For failed child workflows within a distributed map that are standard workflows, the redrive feature functions the same way as with standalone standard workflows. You can restart the failed state within each map iteration from its point of failure, skipping unnecessary steps that have already successfully run.

You can use Step Functions status change notifications with Amazon EventBridge for failure notifications such as sending an email on failure.

Clean up

To clean up your resources, delete the CloudFormation stack via the AWS CloudFormation console.

Conclusion

In this post, we showed you how to use the Step Functions redrive feature to redrive a failed step within a distributed map by restarting the failed step from the point of failure. The distributed map state allows you to write workflows that coordinate large-scale parallel workloads within your serverless applications. Step Functions runs the steps within the distributed map as child workflows at a maximum parallelism of 10,000, which is well above the concurrency supported by many AWS services.

To learn more about distributed map, refer to Step Functions – Distributed Map. To learn more about redriving workflows, refer to Redriving executions.


About the Authors

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing Tennis.

Joe Morotti is a Senior Solutions Architect at Amazon Web Services (AWS), working with Enterprise customers across the Midwest US to develop innovative solutions on AWS. He has held a wide range of technical roles and enjoys showing customers the art of the possible. He has attained seven AWS certification and has a passion for AI/ML and the contact center space. In his free time, he enjoys spending quality time with his family exploring new places and overanalyzing his sports team’s performance.

Uma Ramadoss is a specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping customers design and operate event-driven cloud-native applications and modern business workflows using services like Lambda, EventBridge, Step Functions, and Amazon MWAA.