Tag Archives: AWS Step Functions

Scaling up a Serverless Web Crawler and Search Engine

Post Syndicated from Jack Stevenson original https://aws.amazon.com/blogs/architecture/scaling-up-a-serverless-web-crawler-and-search-engine/

Introduction

Building a search engine can be a daunting undertaking. You must continually scrape the web and index its content so it can be retrieved quickly in response to a user’s query. The goal is to implement this in a way that avoids infrastructure complexity while remaining elastic. However, the architecture that achieves this is not necessarily obvious. In this blog post, we will describe a serverless search engine that can scale to crawl and index large web pages.

A simple search engine is composed of two main components:

  • A web crawler (or web scraper) to extract and store content from the web
  • An index to answer search queries

Web Crawler

You may have already read “Serverless Architecture for a Web Scraping Solution.” In this post, Dzidas reviews two different serverless architectures for a web scraper on AWS. Using AWS Lambda provides a simple and cost-effective option for crawling a website. However, it comes with a caveat: the Lambda timeout capped crawling time at 15 minutes. You can tackle this limitation and build a serverless web crawler that can scale to crawl larger portions of the web.

A typical web crawler algorithm uses a queue of URLs to visit. It performs the following:

  • It takes a URL off the queue
  • It visits the page at that URL
  • It scrapes any URLs it can find on the page
  • It pushes the ones that it hasn’t visited yet onto the queue
  • It repeats the preceding steps until the URL queue is empty

Even if we parallelize visiting URLs, we may still exceed the 15-minute limit for larger websites.

Breaking Down the Web Crawler Algorithm

AWS Step Functions is a serverless function orchestrator. It enables you to sequence one or more AWS Lambda functions to create a longer running workflow. It’s possible to break down this web crawler algorithm into steps that can be run in individual Lambda functions. The individual steps can then be composed into a state machine, orchestrated by AWS Step Functions.

Here is a possible state machine you can use to implement this web crawler algorithm:

Figure 1: Basic State Machine

Figure 1: Basic State Machine

1. ReadQueuedUrls – reads any non-visited URLs from our queue
2. QueueContainsUrls? – checks whether there are non-visited URLs remaining
3. CrawlPageAndQueueUrls – takes one URL off the queue, visits it, and writes any newly discovered URLs to the queue
4. CompleteCrawl – when there are no URLs in the queue, we’re done!

Each part of the algorithm can now be implemented as a separate Lambda function. Instead of the entire process being bound by the 15-minute timeout, this limit will now only apply to each individual step.

Where you might have previously used an in-memory queue, you now need a URL queue that will persist between steps. One option is to pass the queue around as an input and output of each step. However, you may be bound by the maximum I/O sizes for Step Functions. Instead, you can represent the queue as an Amazon DynamoDB table, which each Lambda function may read from or write to. The queue is only required for the duration of the crawl. So you can create the DynamoDB table at the start of the execution, and delete it once the crawler has finished.

Scaling up

Crawling one page at a time is going to be a bit slow. You can use the Step Functions “Map state” to run the CrawlPageAndQueueUrls to scrape multiple URLs at once. You should be careful not to bombard a website with thousands of parallel requests. Instead, you can take a fixed-size batch of URLs from the queue in the ReadQueuedUrls step.

An important limit to consider when working with Step Functions is the maximum execution history size. You can protect against hitting this limit by following the recommended approach of splitting work across multiple workflow executions. You can do this by checking the total number of URLs visited on each iteration. If this exceeds a threshold, you can spawn a new Step Functions execution to continue crawling.

Step Functions has native support for error handling and retries. You can take advantage of this to make the web crawler more robust to failures.

With these scaling improvements, here’s our final state machine:

Figure 2: Final State Machine

Figure 2: Final State Machine

This includes the same steps as before (1-4), but also two additional steps (5 and 6) responsible for breaking the workflow into multiple state machine executions.

Search Index

Deploying a scalable, efficient, and full-text search engine that provides relevant results can be complex and involve operational overheads. Amazon Kendra is a fully managed service, so there are no servers to provision. This makes it an ideal choice for our use case. Amazon Kendra supports HTML documents. This means you can store the raw HTML from the crawled web pages in Amazon Simple Storage Service (S3). Amazon Kendra will provide a machine learning powered search capability on top, which gives users fast and relevant results for their search queries.

Amazon Kendra does have limits on the number of documents stored and daily queries. However, additional capacity can be added to meet demand through query or document storage bundles.

The CrawlPageAndQueueUrls step writes the content of the web page it visits to S3. It also writes some metadata to help Amazon Kendra rank or present results. After crawling is complete, it can then trigger a data source sync job to ensure that the index stays up to date.

One aspect to be mindful of while employing Amazon Kendra in your solution is its cost model. It is priced per index/hour, which is more favorable for large-scale enterprise usage, than for smaller personal projects. We recommend you take note of the free tier of Amazon Kendra’s Developer Edition before getting started.

Overall Architecture

You can add in one more DynamoDB table to monitor your web crawl history. Here is the architecture for our solution:

Figure 3: Overall Architecture

Figure 3: Overall Architecture

A sample Node.js implementation of this architecture can be found on GitHub.

In this sample, a Lambda layer provides a Chromium binary (via chrome-aws-lambda). It uses Puppeteer to extract content and URLs from visited web pages. Infrastructure is defined using the AWS Cloud Development Kit (CDK), which automates the provisioning of cloud applications through AWS CloudFormation.

The Amazon Kendra component of the example is optional. You can deploy just the serverless web crawler if preferred.

Conclusion

If you use fully managed AWS services, then building a serverless web crawler and search engine isn’t as daunting as it might first seem. We’ve explored ways to run crawler jobs in parallel and scale a web crawler using AWS Step Functions. We’ve utilized Amazon Kendra to return meaningful results for queries of our unstructured crawled content. We achieve all this without the operational overheads of building a search index from scratch. Review the sample code for a deeper dive into how to implement this architecture.

Scaling Neuroscience Research on AWS

Post Syndicated from Konrad Rokicki original https://aws.amazon.com/blogs/architecture/scaling-neuroscience-research-on-aws/

HHMI’s Janelia Research Campus in Ashburn, Virginia has an integrated team of lab scientists and tool-builders who pursue a small number of scientific questions with potential for transformative impact. To drive science forward, we share our methods, results, and tools with the scientific community.

Introduction

Our neuroscience research application involves image searches that are computationally intensive but have unpredictable and sporadic usage patterns. The conventional on-premises approach is to purchase a powerful and expensive workstation, install and configure specialized software, and download the entire dataset to local storage. With 16 cores, a typical search of 50,000 images takes 30 seconds. A serverless architecture using AWS Lambda allows us to do this job in seconds for a few cents per search, and is capable of scaling to larger datasets.

Parallel Computation in Neuroscience Research

Basic research in neuroscience is often conducted on fruit flies. This is because their brains are small enough to study in a meaningful way with current tools, but complex enough to produce sophisticated behaviors. Conducting such research nonetheless requires an immense amount of data and computational power. Janelia Research Campus developed the NeuronBridge tool on AWS to accelerate scientific discovery by scaling computation in the cloud.

Color Depth Search Example fly brains

Figure 1: A “mask image” (on the left) is compared to many different fly brains (on the right) to find matching neurons. (Janella Research Campus)

The fruit fly (Drosophila melanogaster) has about 100,000 neurons and its brain is highly stereotyped. This means that the brain of one fruit fly is similar to the next one. Using electron microscopy (EM), the FlyEM project has reconstructed a wiring diagram of a fruit fly brain. This connectome includes the structure of the neurons and the connectivity between them. But EM is only half of the picture. Once scientists know the structure and connectivity, they must perform experiments to find what purpose the neurons serve.

Flies can be genetically modified to reproducibly express a fluorescent protein in certain neurons, causing those neurons to glow under a light microscope (LM). By iterating through many modifications, the FlyLight project has created a vast genetic driver library. This allows scientists to target individual neurons for experiments. For example, blocking a particular neuron of a fly from functioning, and then observing its behavior, allows a scientist to understand the function of that neuron. Through the course of many such experiments, scientists are currently uncovering the function of entire neuronal circuits.

We developed NeuronBridge, a tool available for use by neuroscience researchers around the world, to bridge the gap between the EM and LM data. Scientists can start with EM structure and find matching fly lines in LM. Or they may start with a fly line and find the corresponding neuronal circuits in the EM connectome.

Both EM and LM produce petabytes of 3D images. Image processing and machine learning algorithms are then applied to discern neuron structure. We also developed a computational shortcut called color depth MIP to represent depth as color. This technique compresses large 3D image stacks into smaller 2D images that can be searched efficiently.

Image search is an embarrassingly parallel problem ideally suited to parallelization with simple functions. In a typical search, the scientist will create a “mask image,” which is a color depth image featuring only the neuron they want to find. The search algorithm must then compare this image to hundreds of thousands of other images. The paradigm of launching many short-lived cloud workers, termed burst-parallel compute, was originally suggested by a group at UCSD. To scale NeuronBridge, we decided to build a serverless AWS-native implementation of burst-parallel image search.

The Architecture

Our main reason for using a serverless approach was that our usage patterns are unpredictable and sporadic. The total number of researchers who are likely to use our tool is not large, and only a small fraction of them will need the tool at any given time. Furthermore, our tool could go unused for weeks at a time, only to get a flood of requests after a new dataset is published. A serverless architecture allows us to cope with this unpredictable load. We can keep costs low by only paying for the compute time we actually use.

One challenge of implementing a burst-parallel architecture is that each Lambda invocation requires a network call, with the ensuing network latency. Spawning several thousands of functions from a single manager function can take many seconds. The trick to minimizing this latency is to parallelize these calls by recursively spawning concurrent managers in a tree structure. Each leaf in this tree spawns a set of worker functions to do the work of searching the imagery. Each worker reads a small batch of images from Amazon Simple Storage Service (S3). They are then compared to the mask image, and the intermediate results are written to Amazon DynamoDB, a serverless NoSQL database.

Serverless architecture for burst-parallel search

Figure 2: Serverless architecture for burst-parallel search

Search state is monitored by an AWS Step Functions state machine, which checks DynamoDB once per second. When all the results are ready, the Step Functions state machine runs another Lambda function to combine and sort the results. The state machine addresses error conditions and timeouts, and updates the browser when the asynchronous search is complete. We opted to use AWS AppSync to notify a React web client, providing an interactive user experience while remaining entirely serverless.

As we scaled to 3,000 concurrent Lambda functions reading from our data bucket, we reached Amazon S3’s limit of 5,500 GETs per second per prefix. The fix was to create numbered prefix folders and then randomize our key list. Each worker could then search a random list of images across many prefixes. This change distributed the load from our highly parallel functions across a number of S3 shards, and allowed us to run with much higher parallelism.

We also addressed cold-start latency. Infrequently used Lambda functions take longer to start than recently used ones, and our unpredictable usage patterns meant that we were experiencing many cold starts. In our Java functions, we found that most of the cold-start time was attributed to JVM initialization and class loading. Although many mitigations for this exist, our worker logic was small enough that rewriting the code to use Node.js was the obvious choice. This immediately yielded a huge improvement, reducing cold starts from 8-10 seconds down to 200 ms.

With all of these insights, we developed a general-purpose parallel computation framework called burst-compute. This AWS-native framework runs as a serverless application to implement this architecture. It allows you to massively scale your own custom worker functions and combiner functions. We used this new framework to implement our image search.

Conclusion

The burst-parallel architecture is a powerful new computation paradigm for scientific computing. It takes advantage of the enormous scale and technical innovation of the AWS Cloud to provide near-interactive on-demand compute without expensive hardware maintenance costs. As scientific computing capability matures for the cloud, we expect this kind of large-scale parallel computation to continue becoming more accessible. In the future, the cloud could open doors to entirely new types of scientific applications, visualizations, and analysis tools.

We would like to express our thanks to AWS Solutions Architects Scott Glasser and Ray Chang, for their assistance with design and prototyping, and to Geoffrey Meissner for reviewing drafts of this write-up. 

Source Code

All of the application code described in this article is open source and licensed for reuse:

The data and imagery are shared publicly on the Registry of Open Data on AWS.

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Multi-tenant processing pipelines with AWS DMS, AWS Step Functions, and Apache Hudi on Amazon EMR

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/multi-tenant-processing-pipelines-with-aws-dms-aws-step-functions-and-apache-hudi-on-amazon-emr/

Large enterprises often provide software offerings to multiple customers by providing each customer a dedicated and isolated environment (a software offering composed of multiple single-tenant environments). Because the data is in various independent systems, large enterprises are looking for ways to simplify data processing pipelines. To address this, you can create data lakes to bring your data to a single place.

Typically, a replication tool such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3). When the data is in Amazon S3, you process it based on your requirements. A typical requirement is to sync the data in Amazon S3 with the updates on the source systems. Although it’s easy to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s tough to apply this change data capture (CDC) process on your data lakes. Apache Hudi is a good way to solve this problem. You can use Hudi on Amazon EMR to create Hudi tables (for more information, see Hudi in the Amazon EMR Release Guide).

This post introduces a pipeline that loads data and its ongoing changes (change data capture) from multiple single-tenant tables from different databases to a single multi-tenant table in an Amazon S3-backed data lake, simplifying data processing activities by creating multi-tenant datasets.

Architecture overview

At a high level, this architecture consolidates multiple single-tenant environments into a single multi-tenant dataset so data processing pipelines can be centralized. For example, suppose that your software offering has two tenants, each with their dedicated and isolated environment, and you want to maintain a single multi-tenant table that includes data of both tenants. Moreover, you want any ongoing replication (CDC) in the sources for tenant 1 and tenant 2 to be synchronized (compacted or reconciled) when an insert, delete, or update occurs in the source systems of the respective tenant.

In the past, to support record-level updates or inserts (called upserts) and deletes on an Amazon S3-backed data lake, you relied on either having an Amazon Redshift cluster or an Apache Spark job that reconciled the update, deletes, and inserts with existing historical data.

The architecture for our solution uses Hudi to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert, and delete capabilities. For more information, see Apache Hudi on Amazon EMR.

Moreover, the architecture for our solution uses the following AWS services:

  • AWS DMS – AWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. For more information, see What is AWS Database Migration Service?
  • AWS Step FunctionsAWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. For more information, see What Is AWS Step Functions?
  • Amazon EMR – Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. For more information, see Overview of Amazon EMR Architecture and Overview of Amazon EMR.
  • Amazon S3 – Data is stored in Amazon S3, an object storage service with scalable performance, ease-of-use features, and native encryption and access control capabilities. For more details on Amazon S3, see Amazon S3 as the Data Lake Storage Platform.

Architecture deep dive

The following diagram illustrates our architecture.

This architecture relies on AWS Database Migration Service (AWS DMS) to transfer data from specific tables into an Amazon S3 location organized by tenant-id.

Although AWS DMS performs the migration and the ongoing replication—also known as change data capture (CDC)—it applies a data transformation that adds a custom column named tenant-id and populates it with the tenant-id value defined in the AWS DMS migration task configuration. The AWS DMS data transformations allow you to modify a schema, table, or column or, in this case, add a column with the tenant-id so data transferred to Amazon S3 is grouped by tenant-id.

AWS DMS is also configured to add an additional column with timestamp information. For a full load, each row of this timestamp column contains a timestamp for when the data was transferred from the source to the target by AWS DMS. For ongoing replication, each row of the timestamp column contains the timestamp for the commit of that row in the source database.

We use an AWS Step Functions workflow to move the files AWS DMS wrote to Amazon S3 into an Amazon S3 location that is organized by table name and holds all the tenant’s data. Files in this location all have the new column tenant-id, and the respective tenant-id value is configured in the AWS DMS task configuration.

Next, the Hudi DeltaStreamer utility runs on Amazon EMR to process the multi-tenant source data and create or update the Hudi dataset on Amazon S3.

You can pass to the Hudi DeltaStreamer utility a field in the data that has each record’s timestamp. The Hudi DeltaStreamer utility uses this to ensure records are processed in the proper chronological order. You can also provide the Hudi DeltaStreamer utility one or more SQL transforms, which the utility applies in a sequence as records are read and before the datasets are persisted on Amazon S3 as an Hudi Parquet dataset. We highlight the SQL transform later in this post.

Depending on your downstream consumption patterns, you might require a partitioned dataset. We discuss the process to choose a partition within Hudi DeltaStreamer later in this post. 

For this post, we use the Hudi DeltaStreamer utility instead of the Hudi DataSource due to its operational simplicity. However, you can also use Hudi DataSource with this pattern.

When to use and not use this architecture

This architecture is ideal for the workloads that are processed in batches and can tolerate the latency associated with the time required to capture the changes in the sources, write those changes into objects in Amazon S3, and run the Step Functions workflow that aggregates the objects per tenant and creates the multi-tenant Hudi dataset.

This architecture uses and applies to Hudi COPY_ON_WRITE tables. This architecture is not recommended for latency-sensitive applications and does not support MERGE_ON_READ tables. For more information, see the section Analyzing the properties provided to the command to run the Hudi DeltaStreamer utility.

This architecture pattern is also recommended for workloads that have update rates to the same record (identified by a primary key) that are separated by, at most, microseconds or microsecond precision. The reason behind this is that Hudi uses a field, usually a timestamp, to break ties between records with the same key. If the field used to break ties can capture each individual update, data stored in the Hudi dataset on Amazon S3 is exactly the same as in the source system.

The precision of timestamp fields in your table depends on the database engine at the source. We strongly recommended that as you evaluate adopting this pattern, you also evaluate the support that AWS DMS provides to your current source engines, and understand the rate of updates in your source and the respective timestamp precision requirements that the source needs to support or currently supports. For example: AWS DMS writes any timestamp column values that are written to Amazon S3 as part of an ongoing replication with second precision if the data source is MySQL, and with microsecond precision if the data source is PostgreSQL. See the section Timestamp precision considerations for additional details.

This architecture assumes that all tables in every source database have the same schema and that any changes to a table’s schema is performed to each data source at the same time. Moreover, this architecture pattern assumes that any schema changes are backward compatible—you only append new fields and don’t delete any existing fields. Hudi supports schema evolutions that are backward compatible.

If you’re expecting constant schema changes to the sources, it might be beneficial to consider performing full snapshots instead of ingesting and processing the CDC. If performing full snapshots isn’t practical and you are expecting constant schema changes that are not compatible with Hudi’s schema evolution support, you can use the Hudi DataWriter API with your Spark jobs and address schema changes within the code by casting and adding columns as required to keep backward compatibility.

See the Schema evolution section for more details on the process for schema evolution with AWS DMS and Hudi.

Although it’s out of scope to evaluate the consumption tools available downstream to the Hudi dataset, you can consume Hudi datasets stored on Amazon S3 from Apache Hive, Spark, and Presto on Amazon EMR. Moreover, you can consume Hudi datasets stored on Amazon S3 from Amazon Redshift Spectrum and Amazon Athena.

Solution overview

This solution uses an AWS CloudFormation template to create the necessary resources.

You trigger the Step Functions workflow via the AWS Management Console. The workflow uses AWS Lambda for processing tasks that are part of the logic. Moreover, the workflow submits jobs to an EMR cluster configured with Hudi.

To perform the database migration, the CloudFormation template deploys one AWS DMS replication instance and configures two AWS DMS replications tasks, one per tenant. The AWS DMS replication tasks connect to the source data stores, read the source data, apply any transformations, and load the data into the target data store.

You access an Amazon SageMaker notebook to generate changes (updates) to the sources. Moreover, you connect into the Amazon EMR master node via AWS Systems Manager Session Manager to run Hive or Spark queries in the Hudi dataset backed by Amazon S3. Session Manager provides secure and auditable instance management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys.

The following diagram illustrates the solution architecture.

The orchestration in this demo code currently supports processing at most 25 sources (tables within a database or distributed across multiple databases) per run and is not preventing concurrent runs of the same tenant-id, database-name, or table-name triplet by keeping track of the tenant-id, database-name, or table-name triplet being processed or already processed. Preventing concurrent runs avoids duplication of work. Moreover, the orchestration in this demo code doesn’t prevent the Hudi DeltaStreamer job to run with the output of both an AWS DMS full load task and an AWS DMS CDC load task. For production environments, we recommend that you keep track of the existing tenant_id in the multi-tenant Hudi dataset. This way, if an existing AWS DMS replication task is mistakenly restarted to perform a full load instead of continuing the ongoing replication, your solution can adequately prevent any downstream impact to the datasets. Moreover, we recommend that you keep track of the schema changes in the source and guarantee that the Hudi DeltaStreamer utility only processes files with the same schema.

For details on considerations related to the Step Functions workflows, see Best Practices for Step Functions. For more information about considerations when running AWS DMS at scale, see Best practices for AWS Database Migration Service. Finally, for details on how to tune Hudi, see Performance and Tuning Guide.

Next, we walk you through several key areas of the solution.

Prerequisites

Before getting started, you must create a S3 bucket, unzip and upload the blog artifacts to the S3 bucket and store the database passwords in AWS Systems Manager Parameter Store.

Creating and storing admin passwords in AWS Systems Manager Parameter Store

This solution uses AWS Systems Manager Parameter Store to store the passwords used in the configuration scripts. With Parameter Store, you can create secure string parameters, which are parameters that have a plaintext parameter name and an encrypted parameter value. Parameter Store uses AWS Key Management Service (AWS KMS) to encrypt and decrypt the parameter values of secure string parameters. With Parameter Store, you improve your security posture by separating your data from your code and by controlling and auditing access at granular levels. There is no charge from Parameter Store to create a secure string parameter, but charges for using AWS KMS do apply. For information, see AWS Key Management Service pricing.

Before deploying the CloudFormation templates, run the following AWS Command Line Interface (AWS CLI) commands. These commands create Parameter Store parameters to store the passwords for the RDS master user for each tenant.

aws ssm put-parameter --name "/HudiStack/RDS/Tenant1Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/HudiStack/RDS/Tenant2Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

AWS DMS isn’t integrated with Parameter Store, so you still need to set the same password as in the CloudFormation template parameter DatabasePassword (see the following section).

Creating an S3 bucket for the solution and uploading the solution artifacts to Amazon S3

This solution uses Amazon S3 to store all artifacts used in the solution. Before deploying the CloudFormation templates, create an Amazon S3 bucket and download the artifacts required by the solution.

Unzip the artifacts and upload all folders and files in the .zip file to the S3 bucket you just created.

The following screenshot uses the root location hudistackbucket.

Keep a record of the Amazon S3 root path because you add it as a parameter to the CloudFormation template later.

Creating the CloudFormation stack

To launch the entire solution, choose Launch Stack:

The template requires the following parameters. You can accept the default values for any parameters not in the table. For the full list of parameters, see the CloudFormation template.

  • S3HudiArtifacts – The bucket name that holds the solution artifacts (Lambda function Code, Amazon EMR step artifacts, Amazon SageMaker notebook, Hudi job configuration file template). You created this bucket in the previous step. For this post, we use hudistackbucket.
  • DatabasePassword – The database password. This value needs to be the same as the one configured via Parameter Store. The CloudFormation template uses this value to configure the AWS DMS endpoints.
  • emrLogUri – The Amazon S3 location to store Amazon EMR cluster logs. For example, s3://replace-with-your-bucket-name/emrlogs/.

Testing database connectivity

To test connectivity, complete the following steps:

  1. On the Amazon SageMaker Console, choose Notebook instances.
  2. Locate the notebook instance you created and choose Open Jupyter.
  3. In the new window, choose Runmev5.ipynb.

This opens the notebook for this post. We use the notebook to generate changes and updates to the databases used in the post.

  1. Run all cells of the notebook until the section Triggering the AWS DMS full load tasks for tenant 1 and tenant 2.

Analyzing the AWS DMS configuration

In this section, we examine the data transformation configuration and other AWS DMS configurations.

Data transformation configuration

To support the conversion from single-tenant to multi-tenant pipelines, the CloudFormation template applied a data transformation to the AWS DMS replication task. Specifically, the data transformation adds a new column named tenant_id to the Amazon S3 AWS DMS target. Adding the tenant_id column helps with downstream activities organize the datasets per tenant_id. For additional details on how to set up AWS DMS data transformation, see Transformation Rules and Actions. For reference, the following code is the data transformation we use for this post:

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "salesdb",
                "table-name": "sales_order_detail"
            },
            "rule-action": "include",
            "filters": []
        },
        {
            "rule-type": "transformation",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "add-column",
            "rule-target": "column",
            "object-locator": {
                "schema-name": "salesdb",
                "table-name": "sales_order_detail"
            },
            "value": "tenant_id",
            "expression": "1502",
            "data-type": {
                "type": "string",
                "length": 50
            }
        }
    ]
}

Other AWS DMS configurations

When using Amazon S3 as a target, AWS DMS accepts several configuration settings that provide control on how the files are written to Amazon S3. Specifically, for this use case, AWS DMS uses Parquet as the value for the configuration property DataFormat. For additional details on the S3 settings used by AWS DMS, see S3Settings. For reference, we use the following code:

DataFormat=parquet;TimestampColumnName=timestamp;

Timestamp precision considerations

By default, AWS DMS writes timestamp columns in a Parquet format with a microsecond precision, should the source engine support that precision. If the rate of updates you’re expecting is high, it’s recommended that you use a source that has support for microsecond precision, such as PostgreSQL.

Moreover, if the rate of updates is high, you might want to use a data source with microsecond precision and the AR_H_TIMESTAMP internal header column, which captures the timestamp of when the changes were made instead of the timestamp indicating the time of the commit. See Replicating source table headers using expressions for more details, specifically the details on the AR_H_TIMESTAMP internal header column. When you set TimestampColumnName=timestamp as we mention earlier, the new timestamp column captures the time of the commit.

If you need to use the AR_H_TIMESTAMP internal header column with a data source that supports microsecond precision such as PostgreSQL, we recommend using the Hudi DataSource writer job instead of the Hudi DeltaStreamer utility. The reason for this is that although the AR_H_TIMESTAMP internal header column (in a source that supports microsecond precision) has microsecond precision, the actual value written by AWS DMS on Amazon S3 has a nanosecond format (microsecond precision with the nanosecond dimension set to 0). By using the Hudi  DataSource writer job, you can convert the AR_H_TIMESTAMP internal header column to a timestamp datatype in Spark with microsecond precision and use that new value as the PARTITIONPATH_FIELD_OPT_KEY. See Datasource Writer for more details.

Triggering the AWS DMS full load tasks for Tenant 1 and Tenant 2

In this step, we run a full load of data from both databases to Amazon S3 using AWS DMS. To accomplish this, perform the following steps:

  1. On the AWS DMS console, under Migration, choose Database migration tasks.
  2. Select the replication task for Tenant 1 (dmsreplicationtasksourcetenant1-xxxxxxxxxxxxxxx).
  3. From the Actions menu, choose Restart/Resume.
  4. Repeat these steps for the Tenant 2 replication task.

You can monitor the progress of this task by choosing the task link.

Triggering the Step Functions workflow

Next, we start a Step Functions workflow that automates the end-to-end process of processing the files loaded by AWS DMS to Amazon S3 and creating a multi-tenant Amazon S3-backed table using Hudi.

To trigger the Step Functions workflow, perform the following steps:

  1. On the Step Functions console, choose State machines.
  2. Choose the MultiTenantProcessing workflow.
  3. In the new window, choose Start execution.
  4. Edit the following JSON code and replace the values as needed. You can find the emrClusterId on the Outputs tab of the Cloudformation template.
{
  "hudiConfig": {
    "emrClusterId": "[REPLACE]",
    "targetBasePath": "s3://hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]/transformed/multitenant/huditables/sales_order_detail_mt",
"targetTable": "sales_order_detail_mt",
"sourceOrderingField": "timestamp",
"blogArtifactBucket": "[REPLACE-WITH-BUCKETNAME-WITH-BLOG-ARTIFACTS]",
"configScriptPath": "s3://[REPLACE-WITH-BUCKETNAME-WITH-BLOG-ARTIFACTS]/emr/copy_apache_hudi_deltastreamer_command_properties.sh",
"propertiesFilename": "dfs-source.properties"
},
  "copyConfig":{
            "srcBucketName": "hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]",
            "srcPrefix": "raw/singletenant/",
            "destBucketName": "hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]",
            "destPrefix": "raw/multitenant/salesdb/sales_order_detail_mt/"},
      "sourceConfig":{
            "databaseName": "salesdb",
            "tableName": "sales_order_detail"},  
  "workflowConfig":{
            "ddbTableName": "WorkflowTimestampRegister",
            "ddbTimestampFieldName": "T"},
  "tenants":{
    "array": [
              {
            "tenantId": "1502"
        },           {
            "tenantId": "1501"
        }
    ]
  }
}    
  1. Submit the edited JSON as the input to the workflow.

If you scroll down, you should see an ExecutionSucceeded message in the last line of the event history (see the following screenshot).

  1. On the Amazon S3 console, search for the bucket name used in this post (hudiblog-[account-id]) and then for the prefix raw/multitenant/salesdb/sales_order_detail_mt/.

You should see two files.

  1. Navigate to the prefix transformed/multitenant/salesdb/sales_order_detail_mt/.

You should see the Hudi table Parquet format.

Analyzing the properties provided to the command to run the Hudi DeltaStreamer utility

If the MultitenantProcessing workflow was successful, the files that AWS DMS loaded into Amazon S3 are now available in a multi-tenant table on Amazon S3. This table is now ready to process changes to the databases for each tenant.

In this step, we go over the command the workflow triggers to create a table with Hudi.

The Step Functions workflow for this post runs all the steps except the tasks in the Amazon Sagemaker notebook that you trigger. The following section is just for your reference and discussion purposes.

On the Amazon EMR console, choose the cluster created by the CloudFormation and choose the Steps view of the cluster to obtain the full command used by the workflow.

The command has two sets of properties: one defined directly in the command, and another defined on a configuration file, dfs-source.properties, which is updated automatically by the Step Functions workflow.

The following are some of the properties defined directly in the command:

  • –table-type – The Hudi table type, for this use case, COPY_ON_WRITE. The reason COPY_ON_WRITE is preferred for this use case relates to the fact that the ingestion is done in batch mode, access to the changes in the data are not required in real time, and the downstream workloads are read-heavy. Moreover, with this storage type, you don’t need to handle compactions because updates create a new Parquet file with the impacted rows being updated. Given that the ingestion is done in batch mode, using the COPY_ON_WRITE table type efficiently keeps track of the latest record change, triggering a new write to update the Hudi dataset with the latest value of the record.
    • This post requires that you use the COPY_ON_WRITE table type.
    • For reference, if your requirement is to ingest write- or change-heavy workloads and make the changes available as fast as possible for downstream consumption, Hudi provides the MERGE_ON_READ table type. In this table type, data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files. For more details on the two table types provided by Hudi, see Understanding Dataset Storage Types: Copy on Write vs. Merge on Read and Considerations and Limitations for Using Hudi on Amazon EMR.
  • –source-ordering-field – The field in the source dataset that the utility uses to order the records. For this use case, we configured AWS DMS to add a timestamp column to the data. The utility uses that column to order the records and break ties between records with the same key. This field needs to exist in the data source and can’t be the result of a transformation.
  • –source-class – AWS DMS writes to the Amazon S3 destination in Apache Parquet. Use apache.hudi.utilities.sources.ParquetDFSSource as the value for this property.
  • –target-base-path – The destination base path that the utility writes.
  • –target-table – The table name of the table the utility writes to.
  • –transformer-class – This property indicates the transformer classes that the utility is applied to the input records. For this use case, we use the AWSDmsTransformer plus the SqlQueryBasedTransformer. The transformers are applied in the order they are identified in this property.
  • –payload-class – Set to org.apache.hudi.payload.AWSDmsAvroPayload.
  • –props – The path of the file with additional configurations. For example, file:///home/hadoop/dfs-source.properties.
    • The file /home/hadoop/dfs-source.properties has additional configurations passed to Hudi DeltaStreamer. You can view that file by logging in to your Amazon EMR master node and running cat /home/hadoop/dfs-source.properties.

The following code is the configuration file. By setting the properties in the file, we configure Hudi to write the dataset in a partitioned way and applying a SQL transform before persisting the dataset into the Amazon S3 location.

===Start of Configuration File ===
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.partitionpath.field=tenant_id,year,month
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a.line_id, a.line_number, a.order_id, a.product_id,  a.quantity, a.unit_price, a.discount, a.supply_cost, a.tax, string(year(to_date(a.order_date))) as year, string(month(to_date(a.order_date))) as month, a.Op, a.tenant_id FROM <SRC> a
hoodie.datasource.write.recordkey.field=tenant_id,line_id
hoodie.datasource.write.hive_style_partitioning=true
# DFS Source
hoodie.deltastreamer.source.dfs.root=s3://hudiblog-xxxxxxxxxxxxxx/raw/multitenant/salesdb/sales_order_detail_mt/xxxxxxxxxxxxxxxxxxxxx
===End of Configuration File ===

Some configurations in this file include the following:

  • hoodie.datasource.write.operation=upsert – This property defines if the write operation is an insert, upsert, or bulkinsert. If you have a large initial import, use bulkinsert to load new data into a table, and on the next loads use upsert or insert. The default value for this property is upsert. For this post, the default is accepted because the dataset is small. When you run the solution with larger datasets, you can perform the initial import with bulkinsert and then use upsert for the next loads. For more details on the three modes, see Write Operations.
  • hoodie.datasource.write.hive_style_partitioning=true – This property generates Hive style partitioning—partitions of the form partition_key=partition_values. See the property hoodie.datasource.write.partitionpath.field for more details.
  • hoodie.datasource.write.partitionpath.field=tenant_id,year,month – This property identifies the fields that Hudi uses to extract the partition fields.
  • hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator – This property allows you to combine multiple fields and use the combination of fields as the record key and partition key.
  • hoodie.datasource.write.recordkey.field=tenant_id,line_id – This property indicates the fields in the dataset that Hudi uses to identify a record key. The source table has line_id as the primary key. Given that the Hudi dataset is multi-tenant, tenant_id is also part of the record key.
  • hoodie.deltastreamer.source.dfs.root=s3://hudiblog-your-account-id/raw/multitenant/salesdb/sales_order_detail_mt/xxxxxxxxx – This property indicates the Amazon S3 location with the source files that the Hudi DeltaStreamer utility consumes. This is the location that the MultiTenantProcessing state machine created and includes files from both tenants.
  • hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a.line_id, a.line_number, a.order_id, a.product_id, a.quantity, a.unit_price, a.discount, a.supply_cost, a.tax, year(string(a.order_date)) as year, month(string(a.order_date)) as month, a.Op, a.tenant_id FROM a – This property indicates that the Hudi DeltaStreamer applies a SQL transform before writing the records as a Hudi dataset in Amazon S3. For this use case, we create new fields from the RBDMS table’s field order_date.

A change to the schema of the source RDBMS tables requires the respective update to the SQL transformations. As mentioned before, this use case requires that schema changes to a source schema occur in every table for every tenant.

For additional details on Hudi, see Apply record level changes from relational databases to Amazon S3 data lake using Hudi on Amazon EMR and AWS DMS.

Although the Hudi multi-tenant table is partitioned, you should only have one job (Hudi DeltaStreamer utility or Spark data source) writing to the Hudi dataset. If you’re expecting specific tenants to produce more changes than others, you can consider prioritizing some tenants over others or use dedicated tables for the most active tenants to avoid any impact to tenants that produce a smaller amount of changes.

Schema evolution

Hudi supports schema evolutions that are backward compatible—you only append new fields and don’t delete any existing fields.

By default, Hudi handles schema changes of type by appending new fields to the end of the table.

New fields that you add have to either be nullable or have a default value. For example, as you add a new field to the source database, the records that generate a change have a value for that field in the Hudi dataset, but older records have a null value for that same field in the dataset.

If you require schema changes that are not supported by Hudi, you need to use either a SQL transform or the Hudi DataSource API to handle those changes before writing to the Hudi dataset. For example, if you need to delete a field from the source, you need to use a SQL transform before writing the Hudi dataset to ensure the deleted column is populated by a constant or dummy value, or use the Hudi DataSource API to do so.

Moreover, AWS DMS with Amazon S3 targets support only the following DDL commands: Truncate Table, Drop Table, and Create Table. See Limitations to using Amazon S3 as a target for more details.

This means that when you issue an Alter Table command, the AWS DMS replication tasks don’t capture those changes until you restart the task.

As you implement this architecture pattern, it’s recommended that you automate the schema evolution process and apply the schema changes in a maintenance window when the source isn’t serving requests and the AWS DMS replication CDC tasks aren’t running.

Simulating random updates to the source databases

In this step, you perform some random updates to the data. Navigate back to the Runmev5.ipynb Jupyter notebook and run the cells under the section Simulate random updates for tenant 1 and tenant 2.

Triggering the Step Functions workflow to process the ongoing replication

In this step, you rerun the MultitenantProcessing workflow to process the files generated during the ongoing replication.

  1. On the Step Functions console, choose State machines.
  2. Choose the MultiTenantProcessing workflow
  3. In the new window, choose Start execution.
  4. Use the same JSON as the JSON used when you first ran the workflow.
  5. Submit the edited JSON as the input to the workflow.

Querying the Hudi multi-tenant table with Spark

To query your table with Spark, complete the following steps:

  1. On the Session Manager console, select the instance HudiBlog Spark EMR Cluster.
  2. Choose Start session.

Session Manager creates a secure session to the Amazon EMR master node.

  1. Switch to the Hadoop user and then to the home directory of the Hadoop user.

You’re now ready to run the commands described in the next sections.

  1. Run the following command in the command line:
    spark-shell  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar 

 

  1. When the spark-shell is available, enter the following code:
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    
    val tableName = "sales_order_detail_mt"
    
    val basePath = "s3://hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]/transformed/multitenant/huditables/"+tableName+"/"
    
    spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView(tableName)
    
    val sqlDF = spark.sql("SELECT * FROM "+tableName)
    sqlDF.show(20,false)

Updating the source table schema

In this section, you change the schema of the source tables.

  1. On the AWS DMS management console, stop the AWS DMS replication tasks for Tenant 1 and Tenant 2.
  2. Make sure the Amazon SageMaker notebook isn’t writing to the database.
  3. Navigate back to the Jupyter notebook and run the cells under the section Updating the source table schema.
  4. On the AWS DMS console, resume (not restart) the AWS DMS replication tasks for both tenants.
  5. Navigate back to the Jupyter notebook and run the cells under the section Simulate random updates for tenant 1 after the schema change.

Analyzing the changes to the configuration file for the Hudi DeltaStreamer utility

Because there was a change to the schema of the sources and a new field needs to be propagated to the Hudi dataset, the property hoodie.deltastreamer.transformer.sql is updated with the following value:

hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a.line_id, a.line_number, a.order_id, a.product_id,  a.quantity, a.unit_price, a.discount, a.supply_cost, a.tax, string(year(to_date(a.order_date))) as year, string(month(to_date(a.order_date))) as month, a.Op, a.tenant_id, a.critical FROM <SRC> a

The field a.critical is added to the end, after a.tenant_id.

Triggering the Step Functions workflow to process the ongoing replication after the schema change

In this step, you rerun the MultitenantProcessing workflow to process the files produced by AWS DMS during the ongoing replication.

  1. On the Step Functions console, choose State machines.
  2. Choose the MultiTenantProcessing
  3. In the new window, choose Start execution.
  4. Update the JSON document used when you first ran the workflow by replacing the field propertiesFilename with the following value:

    "propertiesFilename": "dfs-source-new_schema.properties"

  1. Submit the edited JSON as the input to the workflow.

Querying the Hudi multi-tenant table after the schema change

We can now query the multi-tenant table again.

Using Hive

If using Hive when the workflow completes, go back to the terminal window opened by Session Manager and run the following hive-sync command:

/usr/lib/hudi/bin/run_sync_tool.sh  --table sales_order_detail_mt  --partitioned-by tenant_id,year,month  --database default --base-path s3://hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]/transformed/multitenant/huditables/sales_order_detail_mt    --jdbc-url jdbc:hive2:\/\/localhost:10000 --user hive   --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor --pass passnotenforced

For this post, we run the hive-sync command manually. You can also add the flag --enable-hive-sync to the Hudi DeltaStreamer utility command showed in section Analyzing the properties provided to the command to run the Hudi DeltaStreamer utility.

After the hive-sync updates the table schema, the new column is visible from Hive. Start Hive in the command line and run the following query:

select `timestamp`, quantity, critical,  order_id, tenant_id from sales_order_detail_mt where order_id=4668 and tenant_id=1502;

Using Spark

If you just want to use Spark to read the Hudi datasets on Amazon S3, after the schema change, you can use the same code in Querying the Hudi multi-tenant table with Spark and add the following line to the code:

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

Cleaning up

To avoid incurring future charges, stop the AWS DMS replication tasks, delete the contents in the S3 bucket for the solution, and delete the CloudFormation stack.

Conclusion

This post explained how to use AWS DMS, Step Functions, and Hudi on Amazon EMR to convert a single-tenant pipeline to a multi-tenant pipeline.

With this solution, your software offerings that use dedicate sources for each tenant can offload some of the common tasks across all the tenants to a pipeline backed by Hudi datasets stored on Amazon S3. Moreover, by using Hudi on Amazon EMR, you can easily apply inserts, updates, and deletes of the source databases to the datasets in Amazon S3. Moreover, you can easily support schema evolution that is backward compatible.

Follow these steps and deploy the CloudFormation templates in your account to further explore the solution. If you have any questions or feedback, please leave a comment.

The author would like to thank Radhika Ravirala and Neil Mukerje for the dataset and the functions on the notebook.


About the Author

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

ICYMI: Serverless Q4 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2020/

Welcome to the 12th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

ICYMI Q4 calendar

In case you missed our last ICYMI, check out what happened last quarter here.

AWS re:Invent

re:Invent 2020 banner

re:Invent was entirely virtual in 2020 and free to all attendees. The conference had a record number of registrants and featured over 700 sessions. The serverless developer advocacy team presented a number of talks to help developers build their skills. These are now available on-demand:

AWS Lambda

There were three major Lambda announcements at re:Invent. Lambda duration billing changed granularity from 100 ms to 1 ms, which is shown in the December billing statement. All functions benefit from this change automatically, and it’s especially beneficial for sub-100ms Lambda functions.

Lambda has also increased the maximum memory available to 10 GB. Since memory also controls CPU allocation in Lambda, this means that functions now have up to 6 vCPU cores available for processing. Finally, Lambda now supports container images as a packaging format, enabling teams to use familiar container tooling, such as Docker CLI. Container images are stored in Amazon ECR.

There were three feature releases that make it easier for developers working on data processing workloads. Lambda now supports self-hosted Kafka as an event source, allowing you to source events from on-premises or instance-based Kafka clusters. You can also process streaming analytics with tumbling windows and use custom checkpoints for processing batches with failed messages.

We launched Lambda Extensions in preview, enabling you to more easily integrate monitoring, security, and governance tools into Lambda functions. You can also build your own extensions that run code during Lambda lifecycle events. See this example extensions repo for starting development.

You can now send logs from Lambda functions to custom destinations by using Lambda Extensions and the new Lambda Logs API. Previously, you could only forward logs after they were written to Amazon CloudWatch Logs. Now, logging tools can receive log streams directly from the Lambda execution environment. This makes it easier to use your preferred tools for log management and analysis, including Datadog, Lumigo, New Relic, Coralogix, Honeycomb, or Sumo Logic.

Lambda Logs API architecture

Lambda launched support for Amazon MQ as an event source. Amazon MQ is a managed broker service for Apache ActiveMQ that simplifies deploying and scaling queues. The event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service manages an internal poller to invoke the target Lambda function.

Lambda announced support for AWS PrivateLink. This allows you to invoke Lambda functions from a VPC without traversing the public internet. It provides private connectivity between your VPCs and AWS services. By using VPC endpoints to access the Lambda API from your VPC, this can replace the need for an Internet Gateway or NAT Gateway.

For developers building machine learning inferencing, media processing, high performance computing (HPC), scientific simulations, and financial modeling in Lambda, you can now use AVX2 support to help reduce duration and lower cost. In this blog post’s example, enabling AVX2 for an image-processing function increased performance by 32-43%.

Lambda now supports batch windows of up to 5 minutes when using SQS as an event source. This is useful for workloads that are not time-sensitive, allowing developers to reduce the number of Lambda invocations from queues. Additionally, the batch size has been increased from 10 to 10,000. This is now the same batch size as Kinesis as an event source, helping Lambda-based applications process more data per invocation.

Code signing is now available for Lambda, using AWS Signer. This allows account administrators to ensure that Lambda functions only accept signed code for deployment. You can learn more about using this new feature in the developer documentation.

AWS Step Functions

Synchronous Express Workflows have been launched for AWS Step Functions, providing a new way to run high-throughput Express Workflows. This feature allows developers to receive workflow responses without needing to poll services or build custom solutions. This is useful for high-volume microservice orchestration and fast compute tasks communicating via HTTPS.

The Step Functions service recently added support for other AWS services in workflows. You can now integrate API Gateway REST and HTTP APIs. This enables you to call API Gateway directly from a state machine as an asynchronous service integration.

Step Functions now also supports Amazon EKS service integration. This allows you to build workflows with steps that synchronously launch tasks in EKS and wait for a response. The service also announced support for Amazon Athena, so workflows can now query data in your S3 data lakes.

Amazon API Gateway

API Gateway now supports mutual TLS authentication, which is commonly used for business-to-business applications and standards such as Open Banking. This is provided at no additional cost. You can now also disable the default REST API endpoint when deploying APIs using custom domain names.

HTTP APIs now supports service integrations with Step Functions Synchronous Express Workflows. This is a result of the service team’s work to add the most popular features of REST APIs to HTTP APIs.

AWS X-Ray

X-Ray now integrates with Amazon S3 to trace upstream requests. If a Lambda function uses the X-Ray SDK, S3 sends tracing headers to downstream event subscribers. This allows you to use the X-Ray service map to view connections between S3 and other services used to process an application request.

X-Ray announced support for end-to-end tracing in Step Functions to make it easier to trace requests across multiple AWS services. It also launched X-Ray Insights in preview, which generates actionable insights based on anomalies detected in an application. For Java developers, the services released an auto-instrumentation agent, for collecting instrumentation without modifying existing code.

Additionally, the AWS Distro for Open Telemetry is now in preview. OpenTelemetry is a collaborative effort by tracing solution providers to create common approaches to instrumentation.

Amazon EventBridge

You can now use event replay to archive and replay events with Amazon EventBridge. After configuring an archive, EventBridge automatically stores all events or filtered events, based upon event pattern matching logic. Event replay can help with testing new features or changes in your code, or hydrating development or test environments.

EventBridge archive and replay

EventBridge also launched resource policies that simplify managing access to events across multiple AWS accounts. Resource policies provide a powerful mechanism for modeling event buses across multiple account and providing fine-grained access control to EventBridge API actions.

EventBridge resource policies

EventBridge announced support for Server-Side Encryption (SSE). Events are encrypted using AES-256 at no additional cost for customers. EventBridge also increased PutEvent quotas to 10,000 transactions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). This helps support workloads with high throughput.

Developer tools

The AWS SDK for JavaScript v3 was launched and includes first-class TypeScript support and a modular architecture. This makes it easier to import only the services needed to minimize deployment package sizes.

The AWS Serverless Application Model (AWS SAM) is an AWS CloudFormation extension that makes it easier to build, manage, and maintain serverless applications. The latest versions include support for cached and parallel builds, together with container image support for Lambda functions.

You can use AWS SAM in the new AWS CloudShell, which provides a browser-based shell in the AWS Management Console. This can help run a subset of AWS SAM CLI commands as an alternative to using a dedicated instance or AWS Cloud9 terminal.

AWS CloudShell

Amazon SNS

Amazon SNS announced support for First-In-First-Out (FIFO) topics. These are used with SQS FIFO queues for applications that require strict message ordering with exactly once processing and message deduplication.

Amazon DynamoDB

Developers can now use PartiQL, an SQL-compatible query language, with DynamoDB tables, bringing familiar SQL syntax to NoSQL data. You can also choose to use Kinesis Data Streams to capture changes to tables.

For customers using DynamoDB global tables, you can now use your own encryption keys. While all data in DynamoDB is encrypted by default, this feature enables you to use customer managed keys (CMKs). DynamoDB also announced the ability to export table data to data lakes in Amazon S3. This enables you to use services like Amazon Athena and AWS Lake Formation to analyze DynamoDB data with no custom code required.

AWS Amplify and AWS AppSync

You can now use existing Amazon Cognito user pools and identity pools for Amplify projects, making it easier to build new applications for an existing user base. With the new AWS Amplify Admin UI, you can configure application backends without using the AWS Management Console.

AWS AppSync enabled AWS WAF integration, making it easier to protect GraphQL APIs against common web exploits. You can also implement rate-based rules to help slow down brute force attacks. Using AWS Managed Rules for AWS WAF provides a faster way to configure application protection without creating the rules directly.

Serverless Posts

October

November

December

Tech Talks & Events

We hold AWS Online Tech Talks covering serverless topics throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the world, speak on podcasts, and record videos you can find to learn in bite-sized chunks.

Here are some from Q4:

Videos

October:

November:

December:

There are also other helpful videos covering Serverless available on the Serverless Land YouTube channel.

The Serverless Land website

Serverless Land website

To help developers find serverless learning resources, we have curated a list of serverless blogs, videos, events, and training programs at a new site, Serverless Land. This is regularly updated with new information – you can subscribe to the RSS feed for automatic updates or follow the LinkedIn page.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

ICYMI: Serverless pre:Invent 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-preinvent-2020/

During the last few weeks, the AWS serverless team has been releasing a wave of new features in the build-up to AWS re:Invent 2020. This post recaps some of the most important releases for serverless developers.

re:Invent is virtual and free to all attendees in 2020 – register here. See the complete list of serverless sessions planned and join the serverless DA team live on Twitch. Also, follow your DAs on Twitter for live recaps and Q&A during the event.

AWS re:Invent 2020

AWS Lambda

We launched Lambda Extensions in preview, enabling you to more easily integrate monitoring, security, and governance tools into Lambda functions. You can also build your own extensions that run code during Lambda lifecycle events, and there is an example extensions repo for starting development.

You can now send logs from Lambda functions to custom destinations by using Lambda Extensions and the new Lambda Logs API. Previously, you could only forward logs after they were written to Amazon CloudWatch Logs. Now, logging tools can receive log streams directly from the Lambda execution environment. This makes it easier to use your preferred tools for log management and analysis, including Datadog, Lumigo, New Relic, Coralogix, Honeycomb, or Sumo Logic.

Lambda Extensions API

Lambda launched support for Amazon MQ as an event source. Amazon MQ is a managed broker service for Apache ActiveMQ that simplifies deploying and scaling queues. This integration increases the range of messaging services that customers can use to build serverless applications. The event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service manages an internal poller to invoke the target Lambda function.

We also released a new layer to make it simpler to integrate Amazon CodeGuru Profiler. This service helps identify the most expensive lines of code in a function and provides recommendations to help reduce cost. With this update, you can enable the profiler by adding the new layer and setting environment variables. There are no changes needed to the custom code in the Lambda function.

Lambda announced support for AWS PrivateLink. This allows you to invoke Lambda functions from a VPC without traversing the public internet. It provides private connectivity between your VPCs and AWS services. By using VPC endpoints to access the Lambda API from your VPC, this can replace the need for an Internet Gateway or NAT Gateway.

For developers building machine learning inferencing, media processing, high performance computing (HPC), scientific simulations, and financial modeling in Lambda, you can now use AVX2 support to help reduce duration and lower cost. By using packages compiled for AVX2 or compiling libraries with the appropriate flags, your code can then benefit from using AVX2 instructions to accelerate computation. In the blog post’s example, enabling AVX2 for an image-processing function increased performance by 32-43%.

Lambda now supports batch windows of up to 5 minutes when using SQS as an event source. This is useful for workloads that are not time-sensitive, allowing developers to reduce the number of Lambda invocations from queues. Additionally, the batch size has been increased from 10 to 10,000. This is now the same as the batch size for Kinesis as an event source, helping Lambda-based applications process more data per invocation.

Code signing is now available for Lambda, using AWS Signer. This allows account administrators to ensure that Lambda functions only accept signed code for deployment. Using signing profiles for functions, this provides granular control over code execution within the Lambda service. You can learn more about using this new feature in the developer documentation.

Amazon EventBridge

You can now use event replay to archive and replay events with Amazon EventBridge. After configuring an archive, EventBridge automatically stores all events or filtered events, based upon event pattern matching logic. You can configure a retention policy for archives to delete events automatically after a specified number of days. Event replay can help with testing new features or changes in your code, or hydrating development or test environments.

EventBridge archived events

EventBridge also launched resource policies that simplify managing access to events across multiple AWS accounts. This expands the use of a policy associated with event buses to authorize API calls. Resource policies provide a powerful mechanism for modeling event buses across multiple account and providing fine-grained access control to EventBridge API actions.

EventBridge resource policies

EventBridge announced support for Server-Side Encryption (SSE). Events are encrypted using AES-256 at no additional cost for customers. EventBridge also increased PutEvent quotas to 10,000 transactions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). This helps support workloads with high throughput.

AWS Step Functions

Synchronous Express Workflows have been launched for AWS Step Functions, providing a new way to run high-throughput Express Workflows. This feature allows developers to receive workflow responses without needing to poll services or build custom solutions. This is useful for high-volume microservice orchestration and fast compute tasks communicating via HTTPS.

The Step Functions service recently added support for other AWS services in workflows. You can now integrate API Gateway REST and HTTP APIs. This enables you to call API Gateway directly from a state machine as an asynchronous service integration.

Step Functions now also supports Amazon EKS service integration. This allows you to build workflows with steps that synchronously launch tasks in EKS and wait for a response. In October, the service also announced support for Amazon Athena, so workflows can now query data in your S3 data lakes.

These new integrations help minimize custom code and provide built-in error handling, parameter passing, and applying recommended security settings.

AWS SAM CLI

The AWS Serverless Application Model (AWS SAM) is an AWS CloudFormation extension that makes it easier to build, manage, and maintains serverless applications. On November 10, the AWS SAM CLI tool released version 1.9.0 with support for cached and parallel builds.

By using sam build --cached, AWS SAM no longer rebuilds functions and layers that have not changed since the last build. Additionally, you can use sam build --parallel to build functions in parallel, instead of sequentially. Both of these new features can substantially reduce the build time of larger applications defined with AWS SAM.

Amazon SNS

Amazon SNS announced support for First-In-First-Out (FIFO) topics. These are used with SQS FIFO queues for applications that require strict message ordering with exactly once processing and message deduplication. This is designed for workloads that perform tasks like bank transaction logging or inventory management. You can also use message filtering in FIFO topics to publish updates selectively.

SNS FIFO

AWS X-Ray

X-Ray now integrates with Amazon S3 to trace upstream requests. If a Lambda function uses the X-Ray SDK, S3 sends tracing headers to downstream event subscribers. With this, you can use the X-Ray service map to view connections between S3 and other services used to process an application request.

AWS CloudFormation

AWS CloudFormation announced support for nested stacks in change sets. This allows you to preview changes in your application and infrastructure across the entire nested stack hierarchy. You can then review those changes before confirming a deployment. This is available in all Regions supporting CloudFormation at no extra charge.

The new CloudFormation modules feature was released on November 24. This helps you develop building blocks with embedded best practices and common patterns that you can reuse in CloudFormation templates. Modules are available in the CloudFormation registry and can be used in the same way as any native resource.

Amazon DynamoDB

For customers using DynamoDB global tables, you can now use your own encryption keys. While all data in DynamoDB is encrypted by default, this feature enables you to use customer managed keys (CMKs). DynamoDB also announced support for global tables in the Europe (Milan) and Europe (Stockholm) Regions. This feature enables you to scale global applications for local access in workloads running in different Regions and replicate tables for higher availability and disaster recovery (DR).

The DynamoDB service announced the ability to export table data to data lakes in Amazon S3. This enables you to use services like Amazon Athena and AWS Lake Formation to analyze DynamoDB data with no custom code required. This feature does not consume table capacity and does not impact performance and availability. To learn how to use this feature, see this documentation.

AWS Amplify and AWS AppSync

You can now use existing Amazon Cognito user pools and identity pools for Amplify projects, making it easier to build new applications for an existing user base. AWS Amplify Console, which provides a fully managed static web hosting service, is now available in the Europe (Milan), Middle East (Bahrain), and Asia Pacific (Hong Kong) Regions. This service makes it simpler to bring automation to deploying and hosting single-page applications and static sites.

AWS AppSync enabled AWS WAF integration, making it easier to protect GraphQL APIs against common web exploits. You can also implement rate-based rules to help slow down brute force attacks. Using AWS Managed Rules for AWS WAF provides a faster way to configure application protection without creating the rules directly. AWS AppSync also recently expanded service availability to the Asia Pacific (Hong Kong), Middle East (Bahrain), and China (Ningxia) Regions, making the service now available in 21 Regions globally.

Still looking for more?

Join the AWS Serverless Developer Advocates on Twitch throughout re:Invent for live Q&A, session recaps, and more! See this page for the full schedule.

For more serverless learning resources, visit Serverless Land.

New Synchronous Express Workflows for AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/new-synchronous-express-workflows-for-aws-step-functions/

Today, AWS is introducing Synchronous Express Workflows for AWS Step Functions. This is a new way to run Express Workflows to orchestrate AWS services at high-throughput.

Developers have been using asynchronous Express Workflows since December 2019 for workloads that require higher event rates and shorter durations. Customers were looking for ways to receive an immediate response from their Express Workflows without having to write additional code or introduce additional services.

What’s new?

Synchronous Express Workflows allow developers to quickly receive the workflow response without needing to poll additional services or build a custom solution. This is useful for high-volume microservice orchestration and fast compute tasks that communicate via HTTPS.

Getting started

You can build and run Synchronous Express Workflows using the AWS Management Console, the AWS Serverless Application Model (AWS SAM), the AWS Cloud Development Kit (AWS CDK), AWS CLI, or AWS CloudFormation.

To create Synchronous Express Workflows from the AWS Management Console:

  1. Navigate to the Step Functions console and choose Create State machine.
  2. Choose Author with code snippets. Choose Express.
    This generates a sample workflow definition that you can change once the workflow is created.
  3. Choose Next, then choose Create state machine. It may take a moment for the workflow to deploy.

Starting Synchronous Express Workflows

When starting an Express Workflow, a new Type parameter is required. To start a synchronous workflow from the AWS Management Console:

  1. Navigate to the Step Functions console.
  2. Choose an Express Workflow from the list.
  3. Choose Start execution.

    Here you have an option to run the Express Workflow as a synchronous or asynchronous type.
  4. Choose Synchronous and choose Start execution.

  5. Expand Details in the results message to view the output.

Monitoring, logging and tracing

Enable logging to inspect and debug Synchronous Express Workflows. All execution history is sent to CloudWatch Logs. Use the Monitoring and Logging tabs in the Step Functions console to gain visibility into Express Workflow executions.

The Monitoring tab shows six graphs with CloudWatch metrics for Execution Errors, Execution Succeeded, Execution Duration, Billed Duration, Billed Memory, and Executions Started. The Logging tab shows recent logs and the logging configuration, with a link to CloudWatch Logs.

Enable X-Ray tracing to view trace maps and timelines of the underlying components that make up a workflow. This helps to discover performance issues, detect permission problems, and track requests made to and from other AWS services.

Creating an example workflow

The following example uses Amazon API Gateway HTTP APIs to start an Express Workflow synchronously. The workflow analyses web form submissions for negative sentiment. It generates a case reference number and saves the data in an Amazon DynamoDB table. The workflow returns the case reference number and message sentiment score.

  1. The API endpoint is generated by an API Gateway HTTP APIs. A POST request is made to the API which invokes the workflow. It contains the contact form’s message body.
  2. The message sentiment is analyzed by Amazon Comprehend.
  3. The Lambda function generates a case reference number, which is recorded in the DynamoDB table.
  4. The workflow choice state branches based on the detected sentiment.
  5. If a negative sentiment is detected, a notification is sent to an administrator via Amazon Simple Email Service (SES).
  6. When the workflow completes, it returns a ticketID to API Gateway.
  7. API Gateway returns the ticketID in the API response.

The code for this application can be found in this GitHub repository. Three important files define the application and its resources:

Deploying the application

Clone the GitHub repository and deploy with the AWS SAM CLI:

$ git clone https://github.com/aws-samples/contact-form-processing-with-synchronous-express-workflows.git
$ cd contact-form-processing-with-synchronous-express-workflows 
$ sam build 
$ sam deploy -g

This deploys 12 resources, including a Synchronous Express Workflow, three Lambda functions, an API Gateway HTTP API endpoint, and all the AWS Identity & Access Management (IAM) roles and permissions required for the application to run.

Note the HTTP APIs endpoint and workflow ARN outputs.

Testing Synchronous Express Workflows:

A new StartSyncExecution AWS CLI command is used to run the synchronous Express Workflow:

aws stepfunctions start-sync-execution \
--state-machine-arn <your-workflow-arn> \
--input "{\"message\" : \"This is bad service\"}"

The response is received once the workflow completes. It contains the workflow output (sentiment and ticketid), the executionARN, and some execution metadata.

Starting the workflow from HTTP API Gateway:

The application deploys an API Gateway HTTP API, with a Step Functions integration. This is configured in the api.yaml file. It starts the state machine with the POST body provided as the input payload.

Trigger the workflow with a POST request, using the API HTTP API endpoint generated from the deploy step. Enter the following CURL command into the terminal:

curl --location --request POST '<YOUR-HTTP-API-ENDPOINT>' \
--header 'Content-Type: application/json' \
--data-raw '{"message":" This is bad service"}'

The POST request returns a 200 status response. The output field of the response contains the sentiment results (negative) and the generated ticketId (jc4t8i).

Putting it all together

You can use this application to power a web form backend to help expedite customer complaints. In the following example, a frontend application submits form data via an AJAX POST request. The application waits for the response, and presents the user with a message appropriate to the detected sentiment, and a case reference number.

If a negative sentiment is returned in the API response, the user is informed of their case number:

Setting IAM permissions

Before a user or service can start a Synchronous Express Workflow, it must be granted permission to perform the states:StartSyncExecution API operation. This is a new state-machine level permission. Existing Express Workflows can be run synchronously once the correct IAM permissions for StartSyncExecution are granted.

The example application applies this to a policy within the HttpApiRole in the AWS SAM template. This role is added to the HTTP API integration within the api.yaml file.

Conclusion

Step Functions Synchronous Express Workflows allow developers to receive a workflow response without having to poll additional services. This helps developers orchestrate microservices without needing to write additional code to handle errors, retries, and run parallel tasks. They can be invoked in response to events such as HTTP requests via API Gateway, from a parent state machine, or by calling the StartSyncExecution API action.

This feature is available in all Regions where AWS Step Functions is available. View the AWS Regions table to learn more.

For more serverless learning resources, visit Serverless Land.

Introducing Amazon API Gateway service integration for AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/introducing-amazon-api-gateway-service-integration-for-aws-step-functions/

AWS Step Functions now integrates with Amazon API Gateway to enable backend orchestration with minimal code and built-in error handling.

API Gateway is a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. These APIs enable applications to access data, business logic, or functionality from your backend services.

Step Functions allows you to build resilient serverless orchestration workflows with AWS services such as AWS Lambda, Amazon SNS, Amazon DynamoDB, and more. AWS Step Functions integrates with a number of services natively. Using Amazon States Language (ASL), you can coordinate these services directly from a task state.

What’s new?

The new Step Functions integration with API Gateway provides an additional resource type, arn:aws:states:::apigateway:invoke and can be used with both Standard and Express workflows. It allows customers to call API Gateway REST APIs and API Gateway HTTP APIs directly from a workflow, using one of two integration patterns:

  1. Request-Response: calling a service and let Step Functions progress to the next state immediately after it receives an HTTP response. This pattern is supported by Standard and Express Workflows.
  2. Wait-for-Callback: calling a service with a task token and have Step Functions wait until that token is returned with a payload. This pattern is supported by Standard Workflows.

The new integration is configured with the following Amazon States Language parameter fields:

  • ApiEndpoint: The API root endpoint.
  • Path: The API resource path.
  • Method: The HTTP request method.
  • HTTP headers: Custom HTTP headers.
  • RequestBody: The body for the API request.
  • Stage: The API Gateway deployment stage.
  • AuthType: The authentication type.

Refer to the documentation for more information on API Gateway fields and concepts.

Getting started

The API Gateway integration with Step Functions is configured using AWS Serverless Application Model (AWS SAM), the AWS Command Line Interface (AWS CLI), AWS CloudFormation or from within the AWS Management Console.

To get started with Step Functions and API Gateway using the AWS Management Console:

  1. Go to the Step Functions page of the AWS Management Console.
  2. Choose Run a sample project and choose Make a call to API Gateway.The Definition section shows the ASL that makes up the example workflow. The following example shows the new API Gateway resource and its parameters:
  3. Review example Definition, then choose Next.
  4. Choose Deploy resources.

This deploys a Step Functions standard workflow and a REST API with a /pets resource containing a GET and a POST method. It also deploys an IAM role with the required permissions to invoke the API endpoint from Step Functions.

The RequestBody field lets you customize the API’s request input. This can be a static input or a dynamic input taken from the workflow payload.

Running the workflow

  1. Choose the newly created state machine from the Step Functions page of the AWS Management Console
  2. Choose Start execution.
  3. Paste the following JSON into the input field:
    {
      "NewPet": {
        "type": "turtle",
        "price": 74.99
      }
    }
  4. Choose Start execution
  5. Choose the Retrieve Pet Store Data step, then choose the Step output tab.

This shows the successful responseBody output from the “Add to pet store” POST request and the response from the “Retrieve Pet Store Data” GET request.

Access control

The API Gateway integration supports AWS Identity and Access Management (IAM) authentication and authorization. This includes IAM roles, policies, and tags.

AWS IAM roles and policies offer flexible and robust access controls that can be applied to an entire API or individual methods. This controls who can create, manage, or invoke your REST API or HTTP API.

Tag-based access control allows you to set more fine-grained access control for all API Gateway resources. Specify tag key-value pairs to categorize API Gateway resources by purpose, owner, or other criteria. This can be used to manage access for both REST APIs and HTTP APIs.

API Gateway resource policies are JSON policy documents that control whether a specified principal (typically an IAM user or role) can invoke the API. Resource policies can be used to grant access to a REST API via AWS Step Functions. This could be for users in a different AWS account or only for specified source IP address ranges or CIDR blocks.

To configure access control for the API Gateway integration, set the AuthType parameter to one of the following:

  1. {“AuthType””: “NO_AUTH”}
    Call the API directly without any authorization. This is the default setting.
  2. {“AuthType””: “IAM_ROLE”}
    Step Functions assumes the state machine execution role and signs the request with credentials using Signature Version 4.
  3. {“AuthType””: “RESOURCE_POLICY”}
    Step Functions signs the request with the service principal and calls the API endpoint.

Orchestrating microservices

Customers are already using Step Functions’ built in failure handling, decision branching, and parallel processing to orchestrate application backends. Development teams are using API Gateway to manage access to their backend microservices. This helps to standardize request, response formats and decouple business logic from routing logic. It reduces complexity by allowing developers to offload responsibilities of authentication, throttling, load balancing and more. The new API Gateway integration enables developers to build robust workflows using API Gateway endpoints to orchestrate microservices. These microservices can be serverless or container-based.

The following example shows how to orchestrate a microservice with Step Functions using API Gateway to access AWS services. The example code for this application can be found in this GitHub repository.

To run the application:

  1. Clone the GitHub repository:
    $ git clone https://github.com/aws-samples/example-step-functions-integration-api-gateway.git
    $ cd example-step-functions-integration-api-gateway
  2. Deploy the application using AWS SAM CLI, accepting all the default parameter inputs:
    $ sam build && sam deploy -g

    This deploys 17 resources including a Step Functions standard workflow, an API Gateway REST API with three resource endpoints, 3 Lambda functions, and a DynamoDB table. Make a note of the StockTradingStateMachineArn value. You can find this in the command line output or in the Applications section of the AWS Lambda Console:

     

  3. Manually trigger the workflow from a terminal window:
    aws stepFunctions start-execution \
    --state-machine-arn <StockTradingStateMachineArnValue>

The response looks like:

 

When the workflow is run, a Lambda function is invoked via a GET request from API Gateway to the /check resource. This returns a random stock value between 1 and 100. This value is evaluated in the Buy or Sell choice step, depending on if it is less or more than 50. The Sell and Buy states use the API Gateway integration to invoke a Lambda function, with a POST method. A stock_value is provided in the POST request body. A transaction_result is returned in the ResponseBody and provided to the next state. The final state writes a log of the transition to a DynamoDB table.

Defining the resource with an AWS SAM template

The Step Functions resource is defined in this AWS SAM template. The DefinitionSubstitutions field is used to pass template parameters to the workflow definition.

StockTradingStateMachine:
    Type: AWS::Serverless::StateMachine # More info about State Machine Resource: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-resource-statemachine.html
    Properties:
      DefinitionUri: statemachine/stock_trader.asl.json
      DefinitionSubstitutions:
        StockCheckPath: !Ref CheckPath
        StockSellPath: !Ref SellPath
        StockBuyPath: !Ref BuyPath
        APIEndPoint: !Sub "${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com"
        DDBPutItem: !Sub arn:${AWS::Partition}:states:::dynamodb:putItem
        DDBTable: !Ref TransactionTable

The workflow is defined on a separate file (/statemachine/stock_trader.asl.json).

The following code block defines the Check Stock Value state. The new resource, arn:aws:states:::apigateway:invoke declares the API Gateway service integration type.

The parameters object holds the required fields to configure the service integration. The Path and ApiEndpoint values are provided by the DefinitionsSubstitutions field in the AWS SAM template. The RequestBody input is defined dynamically using Amazon States Language. The .$ at the end of the field name RequestBody specifies that the parameter use a path to reference a JSON node in the input.

"Check Stock Value": {
  "Type": "Task",
  "Resource": "arn:aws:states:::apigateway:invoke",
  "Parameters": {
      "ApiEndpoint":"${APIEndPoint}",
      "Method":"GET",
      "Stage":"Prod",
      "Path":"${StockCheckPath}",
      "RequestBody.$":"$",
      "AuthType":"NO_AUTH"
  },
  "Retry": [
      {
          "ErrorEquals": [
              "States.TaskFailed"
          ],
          "IntervalSeconds": 15,
          "MaxAttempts": 5,
          "BackoffRate": 1.5
      }
  ],
  "Next": "Buy or Sell?"
},

The deployment process validates the ApiEndpoint value. The service integration builds the API endpoint URL from the information provided in the parameters block in the format https://[APIendpoint]/[Stage]/[Path].

Conclusion

The Step Functions integration with API Gateway provides customers with the ability to call REST APIs and HTTP APIs directly from a Step Functions workflow.

Step Functions’ built in error handling helps developers reduce code and decouple business logic. Developers can combine this with API Gateway to offload responsibilities of authentication, throttling, load balancing and more. This enables developers to orchestrate microservices deployed on containers or Lambda functions via API Gateway without managing infrastructure.

This feature is available in all Regions where both AWS Step Functions and Amazon API Gateway are available. View the AWS Regions table to learn more. For pricing information, see Step Functions pricing. Normal service limits of API Gateway and service limits of Step Functions apply.

For more serverless learning resources, visit Serverless Land.

Application integration patterns for microservices: Orchestration and coordination

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/application-integration-patterns-for-microservices-orchestration-and-coordination/

This post is courtesy of Stephen Liedig, Sr. Serverless Specialist SA.

This is the final blog post in the “Application Integration Patterns for Microservices” series. Previous posts cover asynchronous messaging for microservices, fan-out strategies, and scatter-gather design patterns.

In this post, I look at how to implement messaging patterns to help orchestrate and coordinate business workflows in our applications. Specifically, I cover two patterns:

  • Pipes and Filters, as presented in the book “Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions” (Hohpe and Woolf, 2004)
  • Saga Pattern, which is a design pattern for dealing with “long-lived transactions” (LLT), published by Garcia-Molina and Salem in 1987.

I discuss these patterns using the Wild Rydes example from this series.

Wild Rydes

Wild Rydes is a fictional technology start-up created to disrupt the transportation industry by replacing traditional taxis with unicorns. Several hands-on AWS workshops use the Wild Rydes scenario. It illustrates concepts such as serverless development, event-driven design, API management, and messaging in microservices.

Wild Rydes

This blog post explores the build process of the Wild Rydes workshop, to help you apply these concepts to your applications.

After completing a unicorn ride, the Wild Rydes customer application charges the customer. Once the driver submits a ride completion, an event triggers the following steps:

  • Registers the fare: registers the fare ride completion event.
  • Initiates the payment (via a payment service): calls a payment gateway for credit card pre-authorization. Using the pre-authorization code, it completes the payment transaction.
  • Updates customer accounting system: once the payment is processed, updates the Wild Rydes customer accounting system with the transaction detail.
  • Publishes “Fare Processed” event: sends a notification to interested components that the process is completed.

Each of the steps interfaces with separate systems – the Wild Rydes system, a third-party payment provider, and the customer accounting system. You could implement these steps inside a single component, but that would make it difficult to change and adapt. It’d also reduce the potential for components reuse within our application. Breaking down the steps into individual components allows you to build components with a single responsibility making it easier to manage each components dependencies and application lifecycle. You can be selective about how you implement the respective components, for example, different teams responsible for the development of the respective components may choose to use different languages. This is where the Pipes and Filters architectural pattern can help.

Pipes and filters

Hohpe and Woolf define Pipes and Filters as an “architectural style to divide a larger processing task into a sequence of smaller, independent processing steps (filters) that are connected by channels (pipes).”

Pipes and filters architecture

Pipes provide a communications channel that abstracts the consumer of messages sent through that channel. It decouples your filter from one another, so components only need to know the messaging channel, or endpoint, where they are sending messages. They do not know who, or what, is processing that message, or where the receiver is located on the network.

Amazon SQS provides a lightweight solution with the power and scale of messaging middleware. It is a simple, flexible, fully managed message queuing service for reliably and continuously exchanging large volume of messages. It has virtually limitless scalability and the ability to increase message throughput without pre-provisioning capacity.

You can create an SQS queue with this AWS CLI command:

aws sqs create-queue --queue-name MyQueue

For the fare processing scenario, you could implement a Pipes and Filters architectural pattern using AWS services. This uses two Amazon SQS queues and an Amazon SNS topic:

Pipes and filters pattern with AWS services

Amazon SQS provides a mechanism for decoupling the components. The filters only need to know to which queue to send the message, without knowing which component processes that message nor when it is processed. SQS does this in a secure, durable, and scalable way.

Despite the fact that none of the filters have a direct dependency on one another, there is still a degree of coupling at the pipe level. Changing execution order therefore forces you to update and redeploy your existing filters to point to a new pipe. In the Wild Rydes example, you can reduce the impact of this by defining an environment variable for the destination endpoint in AWS Lambda function configuration, rather than hardcoding this inside your implementations.

Dealing with failures and retries requires some consideration too. In Amazon SQS terms, this requires you to define configurations, such a message VisibilityTimeOut. The VisibilityTimeOut setting provides you with some transactional support. It ensures that the message is not removed from the queue until after you have finished processing the message and you explicitly delete it from the queue. Using Amazon SQS as an Event Source for AWS Lambda further simplifies that for you because the message polling implementation is managed by the service, so you don’t need to create an explicit implementation in your filter.

Amazon SQS helps deal with failures gracefully as it maintains a count of how many times a message is processed via ReceiveCount. By specifying a maxReceiveCount, you can limit the number of times a poisoned message gets processed. Combine this with a dead letter queue (DLQ), you can then move messages that have exceeded the maxReceiveCount number to the DLQ. Adding Amazon CloudWatch alarms on metrics such as ApproximateNumberOfMessagesVisible on the DLQ, you can proactively alert on system failures if the number of messages on the dead letter queue exceed and acceptable threshold.

Alternatively, you can model the fare payment scenario with AWS Step Functions. Step Functions externalizes the Pipes and Filters pattern. It extracts the coordination from the filter implementations into a state machine that orchestrates the sequence of events. Visual workflows allow you to change the sequence of execution without modifying code, reducing the amount of coupling between collaborating components.

Here is how you could model the fare processing scenario using Step Functions:

Fare processing with Step Functions

{
  "Comment": "StateMachine for Processing Fare Payments",
  "StartAt": "RegisterFare",
  "States": {
    "RegisterFare": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:RegisterFareFunction",
      "Next": "ProcessPayment"
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:ChargeFareFunction",
      "Next": "UpdateCustomerAccount"
    },
    "UpdateCustomerAccount": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:UpdateCustomerAccountFunction",
      "Next": "PublishFareProcessedEvent"
    },
    "PublishFareProcessedEvent": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:REGION:ACCOUNT_ID:myTopic",
        "Message": {
          "Input": "Hello from Step Functions!"
        }
      },
      "End": true
    }
  }
}

AWS Step Functions allows you to easily build more sophisticated workflows. These could include decision points, parallel processing, wait states to pause the state machine execution, error handling, and retry logic. Error and Retry states help you simplify your component implementation by providing a framework for error handling and implementation exponential backoff on retries. You can define alternate execution paths if failures cannot be handled.

In this implementation, each of these states is a discrete transaction. Some implement database transactions when registering the fare, others are calling the third-party payment provider APIs, and internal APIs or programming interfaces when updating the customer accounting system.

Dealing with each of these transactions independently is relatively straightforward. But what happens if you require consistency across all steps so that either all or none of the transactions complete? How can you deal with consistency across multiple, distributed transactions? How do we deal with the temporal aspects of coordinating these potentially long running heterogeneous integrations?

Consistency across multiple, distributed transactions.

Cloud providers do not support Distributed Transaction Coordinators (DTC) or two-phase commit protocols responsible for coordinating transactions across multiple cloud resources. Therefore, you need a mechanism to explicitly coordinate multiple local transactions. This is where the saga pattern and AWS Step Functions can help.

A saga is a design pattern for dealing with “long-lived transactions” (LLT), published by Garcia-Molina and Salem in 1987, they define the concept of a saga as:

“LLT is a saga if it can be written as a sequence of transactions that can be interleaved with other transactions.” (Garcia-Molina, Salem 1987)

Fundamentally, saga can provide a failure management pattern to establish consistency across all of your distributed applications, by implementing a compensating transaction for each step in a series of functions. Compensating transactions allow you to back out of the changes that were previously committed in your series of functions, so that if one of your steps fails you can “undo” what you did before, and leave your system in stable state, devoid of side-effects.

AWS Step Functions provides a mechanism for implementing a saga pattern with the ability to build fully managed state machines that allow you to catch custom business exceptions and manage and share data across state transitions.

Infrastructure with service integrations

Figure 1: Using Step Functions’ Service Integrations for Amazon DynamoDB and Amazon SNS, you can further reduce the need for a custom AWS Lambda implementation to persist data to the database, or send a notification.

By using these capabilities, you can expand on the previous Fare Processing state machine and implementing compensating transaction states. If Register Fare fails, you may want to emit an event that invokes an external support function or generates a notification informing operators of the system the error.

If payment processing failed, you would want to ensure that the status is updated to reflect state change and then notify operators of the failed event. You might decide to refund customers, update the fare status and notify support, until you have been able to resolve issues with the customer accounting system. Regardless of the approach, Step Functions allows you to model a failure scenario that aligns with a more business-centric view of consistency.

Step Functions workflow results

If you want to see the full state machine implementation in Lab4 of Wild Rydes Asynchronous Messaging Workshop. The workshop guides you through building your own state machine so you can see how to apply the pattern to your own scenarios. There are also three other workshops you can walk through that cover the other patterns in the series.

Conclusion

Using Wild Rydes, I show how to use Amazon SQS and AWS Step Functions to decouple your application components and services. I show you how these services help to coordinate and orchestrate distributed components to build resilient and fault tolerant microservices architectures.

Take part in the Wild Rydes Asynchronous Messaging Workshop and learn about the other messaging patterns you can apply to microservices architectures, including fan-out and message filtering, topic-queue-chaining and load balancing (blog post), and scatter-gather.

The Wild Rydes Asynchronous Messaging Workshop resources are hosted on our AWS Samples GitHub repository, including the sample code for this blog post under Lab-4: Choreography and orchestration.

For a deeper dive into queues and topics and how to use these in microservices architectures, read:

  1. The AWS whitepaper, Implementing Microservices on AWS.
  2. Implementing enterprise integration patterns with AWS messaging services: point-to-point channels.
  3. Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels.
  4. Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox.

For more information on enterprise integration patterns, see:

Getting started with RPA using AWS Step Functions and Amazon Textract

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/getting-started-with-rpa-using-aws-step-functions-and-amazon-textract/

This post is courtesy of Joe Tringali, Solutions Architect.

Many organizations are using robotic process automation (RPA) to automate workflow, back-office processes that are labor-intensive. RPA, as software bots, can often handle many of these activities. Often RPA workflows contain repetitive manual tasks that must be done by humans, such as viewing invoices to find payment details.

AWS Step Functions is a serverless function orchestrator and workflow automation tool. Amazon Textract is a fully managed machine learning service that automatically extracts text and data from scanned documents. Combining these services, you can create an RPA bot to automate the workflow and enable employees to handle more complex tasks.

In this post, I show how you can use Step Functions and Amazon Textract to build a workflow that enables the processing of invoices. Download the code for this solution from https://github.com/aws-samples/aws-step-functions-rpa.

Overview

The following serverless architecture can process scanned invoices in PDF or image formats for submitting payment information to a database.

Example architecture
To implement this architecture, I use single-purpose Lambda functions and Step Functions to build the workflow:

  1. Invoices are scanned and loaded into an Amazon Simple Storage Service (S3) bucket.
  2. The loading of an invoice into Amazon S3 triggers an AWS Lambda function to be invoked.
  3. The Lambda function starts an asynchronous Amazon Textract job to analyze the text and data of the scanned invoice.
  4. The Amazon Textract job publishes a completion notification message with a status of “SUCCEEDED” or “FAILED” to an Amazon Simple Notification Service (SNS) topic.
  5. SNS sends the message to an Amazon Simple Queue Service (SQS) queue that is subscribed to the SNS topic.
  6. The message in the SQS queue triggers another Lambda function.
  7. The Lambda function initiates a Step Functions state machine to process the results of the Amazon Textract job.
  8. For an Amazon Textract job that completes successfully, a Lambda function saves the document analysis into an Amazon S3 bucket.
  9. The loading of the document analysis to Amazon S3 triggers another Lambda function.
  10. The Lambda function retrieves the text and data of the scanned invoice to find the payment information. It writes an item to an Amazon DynamoDB table with a status indicating if the invoice can be processed.
  11. If the DynamoDB item contains the payment information, another Lambda function is invoked.
  12. The Lambda function archives the processed invoice into another S3 bucket.
  13. If the DynamoDB item does not contain the payment information, a message is published to an Amazon SNS topic requesting that the invoice be reviewed.

Amazon Textract can extract information from the various invoice images and associate labels with the data. You must then handle the various labels that different invoices may associate with the payee name, due date, and payment amount.

Determining payee name, due date and payment amount

After the document analysis has been saved to S3, a Lambda function retrieves the text and data of the scanned invoice to find the information needed for payment. However, invoices can use a variety of labels for the same piece of data, such a payment’s due date.

In the example invoices included with this blog, the payment’s due date is associated with the labels “Pay On or Before”, “Payment Due Date” and “Payment Due”. Payment amounts can also have different labels, such as “Total Due”, “New Balance Total”, “Total Current Charges”, and “Please Pay”. To address this, I use a series of helper functions in the app.py file in the process_document_analysis folder of the GitHub repo.

In app.py, there is the following get_ky_map helper function:

def get_kv_map(blocks):
    key_map = {}
    value_map = {}
    block_map = {}
    for block in blocks:
        block_id = block['Id']
        block_map[block_id] = block
        if block['BlockType'] == "KEY_VALUE_SET":
            if 'KEY' in block['EntityTypes']:
                key_map[block_id] = block
            else:
                value_map[block_id] = block
    return key_map, value_map, block_map

The get_kv_map function is invoked by the Lambda function handler. It iterates over the “Blocks” element of the document analysis produced by Amazon Textract to create dictionaries of keys (labels) and values (data) associated with each block identified by Amazon Textract. It then invokes the following get_kv_relationship helper function:

def get_kv_relationship(key_map, value_map, block_map):
    kvs = {}
    for block_id, key_block in key_map.items():
        value_block = find_value_block(key_block, value_map)
        key = get_text(key_block, block_map)
        val = get_text(value_block, block_map)
        kvs[key] = val
    return kvs

The get_kv_relationship function merges the key and value dictionaries produced by the get_kv_map function to create a single Python key value dictionary where labels are the keys to the dictionary and the invoice’s data are the values. The handler then invokes the following get_line_list helper function:

def get_line_list(blocks):
    line_list = []
    for block in blocks:
        if block['BlockType'] == "LINE":
            if 'Text' in block: 
                line_list.append(block["Text"])
    return line_list

Extracting payee names is more complex because the data may not be labeled. The payee may often differ from the entity sending the invoice. With the Amazon Textract analysis in a format more easily consumable by Python, I use the following get_payee_name helper function to parse and extract the payee:

def get_payee_name(lines):
    payee_name = ""
    payable_to = "payable to"
    payee_lines = [line for line in lines if payable_to in line.lower()]
    if len(payee_lines) > 0:
        payee_line = payee_lines[0]
        payee_line = payee_line.strip()
        pos = payee_line.lower().find(payable_to)
        if pos > -1:
            payee_line = payee_line[pos + len(payable_to):]
            if payee_line[0:1] == ':':
                payee_line = payee_line[1:]
            payee_name = payee_line.strip()
    return payee_name

The get_amount helper function searches the key value dictionary produced by the get_kv_relationship function to retrieve the payment amount:

def get_amount(kvs, lines):
    amount = None
    amounts = [search_value(kvs, amount_tag) for amount_tag in amount_tags if search_value(kvs, amount_tag) is not None]
    if len(amounts) > 0:
        amount = amounts[0]
    else:
        for idx, line in enumerate(lines):
            if line.lower() in amount_tags:
                amount = lines[idx + 1]
                break
    if amount is not None:
        amount = amount.strip()
        if amount[0:1] == '$':
            amount = amount[1:]
    return amount

The amount_tags variable contains a list of possible labels associated with the payment amount:

amount_tags = ["total due", "new balance total", "total current charges", "please pay"]

Similarly, the get_due_date helper function searches the key value dictionary produced by the get_kv_relationship function to retrieve the payment due date:

def get_due_date(kvs):
    due_date = None
    due_dates = [search_value(kvs, due_date_tag) for due_date_tag in due_date_tags if search_value(kvs, due_date_tag) is not None]
    if len(due_dates) > 0:
        due_date = due_dates[0]
    if due_date is not None:
        date_parts = due_date.split('/')
        if len(date_parts) == 3:
            due_date = datetime(int(date_parts[2]), int(date_parts[0]), int(date_parts[1])).isoformat()
        else:
            date_parts = [date_part for date_part in re.split("\s+|,", due_date) if len(date_part) > 0]
            if len(date_parts) == 3:
                datetime_object = datetime.strptime(date_parts[0], "%b")
                month_number = datetime_object.month
                due_date = datetime(int(date_parts[2]), int(month_number), int(date_parts[1])).isoformat()
    else:
        due_date = datetime.now().isoformat()
    return due_date

The due_date_tag contains a list of possible labels associated with the payment due:

due_date_tags = ["pay on or before", "payment due date", "payment due"]

If all required elements needed to issue a payment are found, it adds an item to the DynamoDB table with a status attribute of “Approved for Payment”. If the Lambda function cannot determine the value of one or more required elements, it adds an item to the DynamoDB table with a status attribute of “Pending Review”.

Payment Processing

If the item in the DynamoDB table is marked “Approved for Payment”, the processed invoice is archived. If the item’s status attribute is marked “Pending Review”, an SNS message is published to an SNS Pending Review topic. You can subscribe to this topic so that you can add additional labels to the Python code for determining payment due dates and payment amounts.

Note that the Lambda functions are single-purpose functions, and all workflow logic is contained in the Step Functions state machine. This diagram shows the various tasks (states) of a successful workflow.

State machine workflow

For more information about this solution, download the code from the GitHub repo (https://github.com/aws-samples/aws-step-functions-rpa).

Prerequisites

Before deploying the solution, you must install the following prerequisites:

  1. Python.
  2. AWS Command Line Interface (AWS CLI) – for instructions, see Installing the AWS CLI.
  3. AWS Serverless Application Model Command Line Interface (AWS SAM CLI) – for instructions, see Installing the AWS SAM CLI.

Deploying the solution

The solution creates the following S3 buckets with names suffixed by your AWS account ID to prevent a global namespace collision of your S3 bucket names:

  • scanned-invoices-<YOUR AWS ACCOUNT ID>
  • invoice-analyses-<YOUR AWS ACCOUNT ID>
  • processed-invoices-<YOUR AWS ACCOUNT ID>

The following steps deploy the example solution in your AWS account. The solution deploys several components including a Step Functions state machine, Lambda functions, S3 buckets, a DynamoDB table for payment information, and SNS topics.

AWS CloudFormation requires an S3 bucket and stack name for deploying the solution. To deploy:

  1. Download code from GitHub repo (https://github.com/aws-samples/aws-step-functions-rpa).
  2. Run the following command to build the artifacts locally on your workstation:sam build
  3. Run the following command to create a CloudFormation stack and deploy your resources:sam deploy --guided --capabilities CAPABILITY_NAMED_IAM

Monitor the progress and wait for the completion of the stack creation process from the AWS CloudFormation console before proceeding.

Testing the solution

To test the solution, upload the PDF test invoices to the S3 bucket named scanned-invoices-<YOUR AWS ACCOUNT ID>.

A Step Functions state machine with the name <YOUR STACK NAME>-ProcessedScannedInvoiceWorkflow runs the workflow. Amazon Textract document analyses are stored in the S3 bucket named invoice-analyses-<YOUR AWS ACCOUNT ID>, and processed invoices are stored in the S3 bucket named processed-invoices-<YOUR AWS ACCOUNT ID>. Processed payments are found in the DynamoDB table named <YOUR STACK NAME>-invoices.

You can monitor the status of the workflows from the Step Functions console. Upon completion of the workflow executions, review the items added to DynamoDB from the Amazon DynamoDB console.

Cleanup

To avoid ongoing charges for any resources you created in this blog post, delete the stack:

  1. Empty the three S3 buckets created during deployment using the S3 console:
    – scanned-invoices-<YOUR AWS ACCOUNT ID>
    – invoice-analyses-<YOUR AWS ACCOUNT ID>
    – processed-invoices-<YOUR AWS ACCOUNT ID>
  2. Delete the CloudFormation stack created during deployment using the CloudFormation console.

Conclusion

In this post, I showed you how to use a Step Functions state machine and Amazon Textract to automatically extract data from a scanned invoice. This eliminates the need for a person to perform the manual step of reviewing an invoice to find payment information to be fed into a backend system. By replacing the manual steps of a workflow with automation, an organization can free up their human workforce to handle more value-added tasks.

To learn more, visit AWS Step Functions and Amazon Textract for more information. For more serverless learning resources, visit https://serverlessland.com.

 

Building Serverless Land: Part 2 – An auto-building static site

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/building-serverless-land-part-2-an-auto-building-static-site/

In this two-part blog series, I show how serverlessland.com is built. This is a static website that brings together all the latest blogs, videos, and training for AWS serverless. It automatically aggregates content from a number of sources. The content exists in a static JSON file, which generates a new static site each time it is updated. The result is a low-maintenance, low-latency serverless website, with almost limitless scalability.

A companion blog post explains how to build an automated content aggregation workflow to create and update the site’s content. In this post, you learn how to build a static website with an automated deployment pipeline that re-builds on each GitHub commit. The site content is stored in JSON files in the same repository as the code base. The example code can be found in this GitHub repository.

The growing adoption of serverless technologies generates increasing amounts of helpful and insightful content from the developer community. This content can be difficult to discover. Serverless Land helps channel this into a single searchable location. By collating this into a static website, users can enjoy a browsing experience with fast page load speeds.

The serverless nature of the site means that developers don’t need to manage infrastructure or scalability. The use of AWS Amplify Console to automatically deploy directly from GitHub enables a regular release cadence with a fast transition from prototype to production.

Static websites

A static site is served to the user’s web browser exactly as stored. This contrasts to dynamic webpages, which are generated by a web application. Static websites often provide improved performance for end users and have fewer or no dependant systems, such as databases or application servers. They may also be more cost-effective and secure than dynamic websites by using cloud storage, instead of a hosted environment.

A static site generator is a tool that generates a static website from a website’s configuration and content. Content can come from a headless content management system, through a REST API, or from data referenced within the website’s file system. The output of a static site generator is a set of static files that form the website.

Serverless Land uses a static site generator for Vue.js called Nuxt.js. Each time content is updated, Nuxt.js regenerates the static site, building the HTML for each page route and storing it in a file.

The architecture

Serverless Land static website architecture

When the content.json file is committed to GitHub, a new build process is triggered in AWS Amplify Console.

Deploying AWS Amplify

AWS Amplify helps developers to build secure and scalable full stack cloud applications. AWS Amplify Console is a tool within Amplify that provides a user interface with a git-based workflow for hosting static sites. Deploy applications by connecting to an existing repository (GitHub, BitBucket Cloud, GitLab, and AWS CodeCommit) to set up a fully managed, nearly continuous deployment pipeline.

This means that any changes committed to the repository trigger the pipeline to build, test, and deploy the changes to the target environment. It also provides instant content delivery network (CDN) cache invalidation, atomic deploys, password protection, and redirects without the need to manage any servers.

Building the static website

  1. To get started, use the Nuxt.js scaffolding tool to deploy a boiler plate application. Make sure you have npx installed (npx is shipped by default with npm version 5.2.0 and above).
    $ npx create-nuxt-app content-aggregator

    The scaffolding tool asks some questions, answer as follows:Nuxt.js scaffolding tool inputs

  2. Navigate to the project directory and launch it with:
    $ cd content-aggregator
    $ npm run dev

    The application is now running on http://localhost:3000.The pages directory contains your application views and routes. Nuxt.js reads the .vue files inside this directory and automatically creates the router configuration.

  3. Create a new file in the /pages directory named blogs.vue:$ touch pages/blogs.vue
  4. Copy the contents of this file into pages/blogs.vue.
  5. Create a new file in /components directory named Post.vue :$ touch components/Post.vue
  6. Copy the contents of this file into components/Post.vue.
  7. Create a new file in /assets named content.json and copy the contents of this file into it.$ touch /assets/content.json

The blogs Vue component

The blogs page is a Vue component with some special attributes and functions added to make development of your application easier. The following code imports the content.json file into the variable blogPosts. This file stores the static website’s array of aggregated blog post content.

import blogPosts from '../assets/content.json'

An array named blogPosts is initialized:

data(){
    return{
      blogPosts: []
    }
  },

The array is then loaded with the contents of content.json.

 mounted(){
    this.blogPosts = blogPosts
  },

In the component template, the v-for directive renders a list of post items based on the blogPosts array. It requires a special syntax in the form of blog in blogPosts, where blogPosts is the source data array and blog is an alias for the array element being iterated on. The Post component is rendered for each iteration. Since components have isolated scopes of their own, a :post prop is used to pass the iterated data into the Post component:

<ul>
  <li v-for="blog in blogPosts" :key="blog">
     <Post :post="blog" />
  </li>
</ul>

The post data is then displayed by the following template in components/Post.vue.

<template>
    <div class="hello">
      <h3>{{ post.title }} </h3>
      <div class="img-holder">
          <img :src="post.image" />
      </div>
      <p>{{ post.intro }} </p>
      <p>Published on {{post.date}}, by {{ post.author }} p>
      <a :href="post.link"> Read article</a>
    </div>
</template>

This forms the framework for the static website. The /blogs page displays content from /assets/content.json via the Post component. To view this, go to http://localhost:3000/blogs in your browser:

The /blogs page

Add a new item to the content.json file and rebuild the static website to display new posts on the blogs page. The previous content was generated using the aggregation workflow explained in this companion blog post.

Connect to Amplify Console

Clone the web application to a GitHub repository and connect it to Amplify Console to automate the rebuild and deployment process:

  1. Upload the code to a new GitHub repository named ‘content-aggregator’.
  2. In the AWS Management Console, go to the Amplify Console and choose Connect app.
  3. Choose GitHub then Continue.
  4. Authorize to your GitHub account, then in the Recently updated repositories drop-down select the ‘content-aggregator’ repository.
  5. In the Branch field, leave the default as master and choose Next.
  6. In the Build and test settings choose edit.
  7. Replace - npm run build with – npm run generate.
  8. Replace baseDirectory: / with baseDirectory: dist

    This runs the nuxt generate command each time an application build process is triggered. The nuxt.config.js file has a target property with the value of static set. This generates the web application into static files. Nuxt.js creates a dist directory with everything inside ready to be deployed on a static hosting service.
  9. Choose Save then Next.
  10. Review the Repository details and App settings are correct. Choose Save and deploy.

    Amplify Console deployment

Once the deployment process has completed and is verified, choose the URL generated by Amplify Console. Append /blogs to the URL, to see the static website blogs page.

Any edits pushed to the repository’s content.json file trigger a new deployment in Amplify Console that regenerates the static website. This companion blog post explains how to set up an automated content aggregator to add new items to the content.json file from an RSS feed.

Conclusion

This blog post shows how to create a static website with vue.js using the nuxt.js static site generator. The site’s content is generated from a single JSON file, stored in the site’s assets directory. It is automatically deployed and re-generated by Amplify Console each time a new commit is pushed to the GitHub repository. By automating updates to the content.json file you can create low-maintenance, low-latency static websites with almost limitless scalability.

This application framework is used together with this automated content aggregator to pull together articles for http://serverlessland.com. Serverless Land brings together all the latest blogs, videos, and training for AWS Serverless. Download the code from this GitHub repository to start building your own automated content aggregation platform.

Building Serverless Land: Part 1 – Automating content aggregation

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/building-serverless-land-part-1-automating-content-aggregation/

In this two part blog series, I show how serverlessland.com is built. This is a static website that brings together all the latest blogs, videos, and training for AWS Serverless. It automatically aggregates content from a number of sources. The content exists in static JSON files, which generate a new site build each time they are updated. The result is a low-maintenance, low-latency serverless website, with almost limitless scalability.

This blog post explains how to automate the aggregation of content from multiple RSS feeds into a JSON file stored in GitHub. This workflow uses AWS Lambda and AWS Step Functions, triggered by Amazon EventBridge. The application can be downloaded and deployed from this GitHub repository.

The growing adoption of serverless technologies generates increasing amounts of helpful and insightful content from the developer community. This content can be difficult to discover. Serverless Land helps channel this into a single searchable location. By automating the collection of this content with scheduled serverless workflows, the process robustly scales to near infinite numbers. The Step Functions MAP state allows for dynamic parallel processing of multiple content sources, without the need to alter code. On-boarding a new content source is as fast and simple as making a single CLI command.

The architecture

Automating content aggregation with AWS Step Functions

The application consists of six Lambda functions orchestrated by a Step Functions workflow:

  1. The workflow is triggered every 2 hours by an EventBridge scheduler. The schedule event passes an RSS feed URL to the workflow.
  2. The first task invokes a Lambda function that runs an HTTP GET request to the RSS feed. It returns an array of recent blog URLs. The array of blog URLs is provided as the input to a MAP state. The MAP state type makes it possible to run a set of steps for each element of an input array in parallel. The number of items in the array can be different for each execution. This is referred to as dynamic parallelism.
  3. The next task invokes a Lambda function that uses the GitHub REST API to retrieve the static website’s JSON content file.
  4. The first Lambda function in the MAP state runs an HTTP GET request to the blog post URL provided in the payload. The URL is scraped for content and an object containing detailed metadata about the blog post is returned in the response.
  5. The blog post metadata is compared against the website’s JSON content file in GitHub.
  6. A CHOICE state determines if the blog post metadata has already been committed to the repository.
  7. If the blog post is new, it is added to an array of “content to commit”.
  8. As the workflow exits the MAP state, the results are passed to the final Lambda function. This uses a single git commit to add each blog post object to the website’s JSON content file in GitHub. This triggers an event that rebuilds the static site.

Using Secrets in AWS Lambda

Two of the Lambda functions require a GitHub personal access token to commit files to a repository. Sensitive credentials or secrets such as this should be stored separate to the function code. Use AWS Systems Manager Parameter Store to store the personal access token as an encrypted string. The AWS Serverless Application Model (AWS SAM) template grants each Lambda function permission to access and decrypt the string in order to use it.

  1. Follow these steps to create a personal access token that grants permission to update files to repositories in your GitHub account.
  2. Use the AWS Command Line Interface (AWS CLI) to create a new parameter named GitHubAPIKey:
aws ssm put-parameter \
--name /GitHubAPIKey \
--value ReplaceThisWithYourGitHubAPIKey \
--type SecureString

{
    "Version": 1,
    "Tier": "Standard"
}

Deploying the application

  1. Fork this GitHub repository to your GitHub Account.
  2. Clone the forked repository to your local machine and deploy the application using AWS SAM.
  3. In a terminal, enter:
    git clone https://github.com/aws-samples/content-aggregator-example
    sam deploy -g
  4. Enter the required parameters when prompted.

This deploys the application defined in the AWS SAM template file (template.yaml).

The business logic

Each Lambda function is written in Node.js and is stored inside a directory that contains the package dependencies in a `node_modules` folder. These are defined for each function by its relative package.json file. The function dependencies are bundled and deployed using the sam build && deploy -g command.

The GetRepoContents and WriteToGitHub Lambda functions use the octokit/rest.js library to communicate with GitHub. The library authenticates to GitHub by using the GitHub API key held in Parameter Store. The AWS SDK for Node.js is used to obtain the API key from Parameter Store. With a single synchronous call, it retrieves and decrypts the parameter value. This is then used to authenticate to GitHub.

const AWS = require('aws-sdk');
const SSM = new AWS.SSM();


//get Github API Key and Authenticate
    const singleParam = { Name: '/GitHubAPIKey ',WithDecryption: true };
    const GITHUB_ACCESS_TOKEN = await SSM.getParameter(singleParam).promise();
    const octokit = await  new Octokit({
      auth: GITHUB_ACCESS_TOKEN.Parameter.Value,
    })

Lambda environment variables are used to store non-sensitive key value data such as the repository name and JSON file location. These can be entered when deploying with AWS SAM guided deploy command.

Environment:
        Variables:
          GitHubRepo: !Ref GitHubRepo
          JSONFile: !Ref JSONFile

The GetRepoContents function makes a synchronous HTTP request to the GitHub repository to retrieve the contents of the website’s JSON file. The response SHA and file contents are returned from the Lambda function and acts as the input to the next task in the Step Functions workflow. This SHA is used in final step of the workflow to save all new blog posts in a single commit.

Map state iterations

The MAP state runs concurrently for each element in the input array (each blog post URL).

Each iteration must compare a blog post URL to the existing JSON content file and decide whether to ignore the post. To do this, the MAP state requires both the input array of blog post URLs and the existing JSON file contents. The ItemsPath, ResultPath, and Parameters are used to achieve this:

  • The ItemsPath sets input array path to $.RSSBlogs.body.
  • The ResultPath states that the output of the branches is placed in $.mapResults.
  • The Parameters block replaces the input to the iterations with a JSON node. This contains both the current item data from the context object ($$.Map.Item.Value) and the contents of the GitHub JSON file ($.RepoBlogs).
"Type":"Map",
    "InputPath": "$",
    "ItemsPath": "$.RSSBlogs.body",
    "ResultPath": "$.mapResults",
    "Parameters": {
        "BlogUrl.$": "$$.Map.Item.Value",
        "RepoBlogs.$": "$.RepoBlogs"
     },
    "MaxConcurrency": 0,
    "Iterator": {
       "StartAt": "getMeta",

The Step Functions resource

The AWS SAM template uses the following Step Functions resource definition to create a Step Functions state machine:

  MyStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      DefinitionUri: statemachine/my_state_machine.asl.JSON
      DefinitionSubstitutions:
        GetBlogPostArn: !GetAtt GetBlogPost.Arn
        GetUrlsArn: !GetAtt GetUrls.Arn
        WriteToGitHubArn: !GetAtt WriteToGitHub.Arn
        CompareAgainstRepoArn: !GetAtt CompareAgainstRepo.Arn
        GetRepoContentsArn: !GetAtt GetRepoContents.Arn
        AddToListArn: !GetAtt AddToList.Arn
      Role: !GetAtt StateMachineRole.Arn

The actual workflow definition is defined in a separate file (statemachine/my_state_machine.asl.JSON). The DefinitionSubstitutions property specifies mappings for placeholder variables. This enables the template to inject Lambda function ARNs obtained by the GetAtt intrinsic function during template translation:

Step Functions mappings with placeholder variables

A state machine execution role is defined within the AWS SAM template. It grants the `Lambda invoke function` action. This is tightly scoped to the six Lambda functions that are used in the workflow. It is the minimum set of permissions required for the Step Functions to carry out its task. Additional permissions can be granted as necessary, which follows the zero-trust security model.

Action: lambda:InvokeFunction
Resource:
- !GetAtt GetBlogPost.Arn
- !GetAtt GetUrls.Arn
- !GetAtt CompareAgainstRepo.Arn
- !GetAtt WriteToGitHub.Arn
- !GetAtt AddToList.Arn
- !GetAtt GetRepoContents.Arn

The Step Functions workflow definition is authored using the AWS Toolkit for Visual Studio Code. The Step Functions support allows developers to quickly generate workflow definitions from selectable examples. The render tool and automatic linting can help you debug and understand the workflow during development. Read more about the toolkit in this launch post.

Scheduling events and adding new feeds

The AWS SAM template creates a new EventBridge rule on the default event bus. This rule is scheduled to invoke the Step Functions workflow every 2 hours. A valid JSON string containing an RSS feed URL is sent as the input payload. The feed URL is obtained from a template parameter and can be set on deployment. The AWS Compute Blog is set as the default feed URL. To aggregate additional blog feeds, create a new rule to invoke the Step Functions workflow. Provide the RSS feed URL as valid JSON input string in the following format:

{“feedUrl”:”replace-this-with-your-rss-url”}

ScheduledEventRule:
    Type: "AWS::Events::Rule"
    Properties:
      Description: "Scheduled event to trigger Step Functions state machine"
      ScheduleExpression: rate(2 hours)
      State: "ENABLED"
      Targets:
        -
          Arn: !Ref MyStateMachine
          Id: !GetAtt MyStateMachine.Name
          RoleArn: !GetAtt ScheduledEventIAMRole.Arn
          Input: !Sub
            - >
              {
                "feedUrl" : "${RssFeedUrl}"
              }
            - RssFeedUrl: !Ref RSSFeed

A completed workflow with step output

Conclusion

This blog post shows how to automate the aggregation of content from multiple RSS feeds into a single JSON file using serverless workflows.

The Step Functions MAP state allows for dynamic parallel processing of each item. The recent increase in state payload size limit means that the contents of the static JSON file can be held within the workflow context. The application decision logic is separated from the business logic and events.

Lambda functions are scoped to finite business logic with Step Functions states managing decision logic and iterations. EventBridge is used to manage the inbound business events. The zero-trust security model is followed with minimum permissions granted to each service and Parameter Store used to hold encrypted secrets.

This application is used to pull together articles for http://serverlessland.com. Serverless land brings together all the latest blogs, videos, and training for AWS Serverless. Download the code from this GitHub repository to start building your own automated content aggregation platform.

Automating EMR workloads using AWS Step Functions

Post Syndicated from Afsar Jahangir original https://aws.amazon.com/blogs/big-data/automating-emr-workloads-using-aws-step-functions/

Amazon EMR allows you to process vast amounts of data quickly and cost-effectively at scale. Using open-source tools such as Apache Spark, Apache Hive, and Presto, and coupled with the scalable storage of Amazon Simple Storage Service (Amazon S3), Amazon EMR gives analytical teams the engines and elasticity to run petabyte-scale analysis for a fraction of the cost of traditional on-premises clusters. Developers and analysts can use Jupyter-based Amazon EMR notebooks for iterative development, collaboration, and access to data stored across AWS data products.

What happens if you have Amazon EMR code that needs to run automatically on a regular basis? Maybe the job only runs when for certain events, like new data arriving in Amazon S3. Or maybe you want to run a job every Friday afternoon at 2:00 PM. What if there is a multiple step process?

To run Amazon EMR workloads on a schedule, you can automate everything with AWS Step Functions. This post walks through how to use Step Functions state machines and the callback pattern to automate EMR jobs. You can download the code examples from the GitHub repo.

Prerequisites

To follow along with this walkthrough, you must have the following:

Solution overview

For this use case, I want to run two applications on my EMR cluster. The start of the second application depends on the successful completion and output of the first. At a high level, I want to launch an EMR cluster automatically, run the code, and remove the cluster. Specifically, when the first program successfully completes, I want to run the second program.

At the conclusion of the second application, in some cases I may want to run both programs multiple times (with different dataset sizes, perhaps). I need a way to decide to run the process again with the same cluster. Whether the steps succeed or fail, at the conclusion, I always want to delete the CloudFormation stack that contains my EMR cluster to reduce cost. The following diagram illustrates this high-level overview of the pipeline operation.

Workflow details

I run two programs, and I need the first program to complete before running the second one. I optionally want to repeat those two programs with different datasets to get the final state of the data. To orchestrate the jobs, I can run through the same steps multiple times with the same active EMR cluster.

To facilitate automating the pipeline, I use an inner state machine to check the cluster status and submit EMR job steps. I then wrap that inner state machine in an outer state machine. The outer state machine starts the cluster and submits information to the inner state machine. It waits for all steps to complete, then deletes the EMR cluster.

The following flow chart illustrates the steps and checkpoints that make up the pipeline.

Deploying the pipeline state machines

To simplify pipeline deployment, I use AWS SAM, an open-source framework for building serverless applications. AWS SAM provides a single deployment configuration, extensions to CloudFormation templates, built-in best practices, and local debugging and testing. You can use AWS SAM with a suite of AWS tools for building serverless applications. For more information, see What Is the AWS Serverless Application Model (AWS SAM)?

Initiating the application

Navigate to the path where you want to download the files and initiate the AWS SAM application. I want to run the code from my local machine and have created the following location:

~/Documents/projects/blog/steppipeline/automation-ml-step-data-pipeline

From this directory, I initialize the application using sam init. This connects to the repository and downloads the files for creation of the ML pipeline. See the following code:

sam init -l https://github.com/aws-samples/automation-ml-step-data-pipeline.git

Creating the S3 bucket and moving dependencies

For this post, I orchestrate an existing process from the post Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR, which runs on Amazon EMR. The pipeline reads code artifacts from Amazon S3, where the EMR cluster has read permission. There are two programs: kmeansandey.py and kmeanswsssey.py.

First, create the bucket from the command line using the aws s3 mb command and upload the code. Your bucket name must be globally unique:

aws s3 mb s3://<your bucket name>

Move the artifacts to your bucket, replacing <your bucket name> with your bucket name:

aws s3 cp sample_ml_code/kmeansandey.py s3://<your bucket name>/testcode/kmeansandey.py
aws s3 cp sample_ml_code/kmeanswsssey.py s3://<your bucket name>/testcode/kmeanswsssey.py
aws s3 cp emr/bootstrapactions.sh s3://<your bucket name>/emr-bootstrap-scripts/bootstrapactions.sh
aws s3 cp emr/emr-cluster-config.json s3://<your bucket name>/emr-cluster-config.json
aws s3 cp emr/emr-cluster-sample.yaml s3://<your bucket name>/emr-cluster-sample.yaml

Deploying the application

Deploy the build artifacts to the AWS Cloud using the following code:

sam deploy --guided

AWS SAM prompts you for the parameters that you need to build and deploy the application. I have provided some default values where possible.

The final output of your deployment process should indicate that all stacks were built:

Successfully created/updated stack - step-pipeline in us-east-1

After deployment, you receive an email to confirm your subscription. Choose the confirmation link in the email to receive pipeline notifications.

Submitting a workload to your Step Functions state machine

To create a cluster and submit EMR jobs, the outer state machine needs a JSON payload. This contains the location of the programs in Amazon S3, the Amazon EMR CloudFormation template, and the parameter files used to launch the EMR cluster.

Creating an Amazon EC2 key pair

To use the same sample programs and EMR cluster template that you used to test your pipeline, you need to use an Amazon EC2 key pair for SSH credentials. When you create a cluster, you can specify the Amazon Elastic Compute Cloud (Amazon EC2) key pair to use for SSH connections to all cluster instances. The name of the keypair for this cluster is referenced in the emr-cluster-config.json file. See the following code:

<snip>
  {
    "ParameterKey": "Keyname",
    "ParameterValue": "emrcluster-launch"
  },
</snip>

To use the example as-is with the parameters unchanged, create an Amazon EC2 key pair on the AWS Management Console or AWS Command Line Interface (AWS CLI).

  1. On the Amazon EC2 console, under Network & Security, choose Key Pairs.
  2. On the Key Pairs page, choose Create Key Pair.
  3. For Key pair name, enter emrcluster-launch.
  4. Choose Create.
  5. When the console prompts you to save the private key file, save it in a safe place.

This is the only chance for you to save the private key file.

Inputting JSON for launching the pipeline

The simplest way for you to run the pipeline is to use the Start execution feature on the Step Functions console. The console gives you full functionality to initiate the function and submit a payload. In the example test_input.json, update the bucket values, security group, and subnet with the information for your account:

{  
    "ModelName": "PipelineTest_01",  
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "EMRCloudFormation": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-sample.yaml",  
    "EMRParameters": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-config.json",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "SecurityGroup": "<your security group>",  
    "SubNet": "<your subnet>",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"]
}

The payload includes the following information:

  • ModelName – A short descriptive identifier used to identify the transient EMR cluster created during this process. This name shows on the Amazon EMR console for easy identification.
  • ModelProgram – The Amazon S3 URL location of the program that runs when the model initiates on the EMR cluster (step 3).
  • PreProcessingProgram – The Amazon S3 URL location of the program that runs when preprocessing initiates on the EMR cluster (step 2).
  • EMRCloudFormation – The S3 bucket HTTPS location of the CloudFormation template for launching the transient EMR cluster.
  • EMRParameters – The Amazon S3 HTTPS location of the parameter file supporting the Amazon EMR CloudFormation template.
  • JobInput – The Amazon S3 URL location of the input data for the preprocessing program.
  • SecurityGroup – The security group with ingress and egress rules for the launched EMR cluster
  • SubNet – The subnet identifier where you place your EMR cluster.
  • ClusterSize – Denotes the number of EMR cluster nodes to run the job and can be changed based on the compute need. I use 4 nodes as the input value for the sample program.
  • ProcessingMode – This is an array of values. The pipeline runs steps 2 and 3 for each value in the array. The value is passed into the program unchanged and can be used to internally control how the program runs. For this use case, it runs a single time on the small dataset.

Opening the Step Functions Executions page

On the Step Functions console, choose MLStateMachine. This is the outer state machine. On the detail page for the outer state machine, choose Start execution.

Entering your payload

On the New execution page, enter the JSON for your pipeline based on the example test_input.json. Choose Start execution.

Reviewing the workflow as it runs

You can see the pipeline running in the visual workflow and review the event history on the detail page. The following diagram shows the state machine definition used:

Diving into the pipelines details

There are four processes that run in the outer state machine pipeline:

  1. Step 1 launches an EMR cluster using the CloudFormation template. The AWS Lambda function downloads the template and parameter file from the specified Amazon S3 location and initiates the stack build.
  2. When the EMR cluster is ready, step 2 initiates the first set of code against the newly created EMR cluster, passing in the remaining parameters to the inner state machine. It adds the stack id, EMR cluster id, and status to the payload. These values are obtained from the output of the CloudFormation stack. See the following code:
    "ModelName": "PipelineTest_01",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"],
    "StackId": "arn:aws:cloudformation:us-east-1:575444587305:stack/PipelineTest01-auto-emr-02142020041612/bc5fd7a0-4ee0-11ea-a395-0e4c53e0aefd",
    "Status": "CREATE_COMPLETE",
    "ClusterId": "j-MF6LWBLJZ88K"

The code contains the following information:

  • ModelName is used in the EMR cluster name to make it easier to identify in the console and AWS CLI output.
  • PreProcessingProgram in our use case points to the first code step (py). The code is passed through the first state machine and submitted to the second state machine and Amazon EMR.
  • JobInput, ClusterSize, ClusterId, StackId, and ProcessingMode are passthrough values that the program needs to run.

The step initiates the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx, which engages the inner state machine asynchronously to run a high-level process of checking the cluster, adding a step, checking to see if the step is complete, and exiting back to the outer state machine when complete.

  1. Next, the outer state machine runs the Model program code (step 3) by submitting it to the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx to engage the inner state machine for the second set of code (py). The process is the same as step 2 but the code it runs is from a different file and the output from the preprocessing step becomes the input for the step. See the following code:
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",

When the inner state machine is complete, it moves to a step that removes the first value from the ProcessingMode array. For this use case, there is only one value (TRAINING), which is removed, leaving the array empty. The next step in the state machine looks for remaining values; if there are none, it marks all steps as complete and moves to Delete EMR cluster.

  1. The final step in the outer state machine is to remove the EMR cluster. The Delete EMR cluster step passes the CloudFormation stack ID into lambda/delete_cfn_stack.py, initiating the deletion of the stack and cleaning up all the resources.

The output of the test programs is stored in Amazon S3 in two folders under the pipeline artifacts. The preprocessing folder contains data that is used to drive the output in the model folder. The following screenshot shows the folders in Amazon S3.

Conclusion

The Step Functions workflow in this post is a repeatable two-step pipeline. It starts an EMR cluster, runs a program that outputs data, and initiates a second program that depends on the previous job finishing. It then deletes all resources automatically.

You can adapt the workflow to respond to Amazon S3 events, a message received in a queue, a file checked into a code repository, or a schedule. Any event that can invoke Lambda can initiate the pipeline. For more information, see Invoking AWS Lambda functions.

You can download the example code from the GitHub repo and adapt it for your use case. Let me know in the comments what you built for your environment.


About the Authors

Mohammed “Afsar ” Jahangir Ali is a Senior Big Data Consultant with Amazon since January 2018. He is a data enthusiast helping customers shape their data lakes and analytic journeys on AWS.In his spare time, he enjoys taking pictures, listening to music, and spend time with family.

 

 

 

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.

 

 

 

ICYMI: Serverless Q3 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q3-2020/

Welcome to the 11th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

Q3 Calendar

In case you missed our last ICYMI, checkout what happened last quarter here.

AWS Lambda

MSK trigger in Lambda

In August, we launched support for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) as an event source for Lambda functions. Lambda has existing support for processing streams from Kinesis and DynamoDB. Now you can process data streams from Amazon MSK and easily integrate with downstream serverless workflows. This integration allows you to process batches of records, one per partition at a time, and scale concurrency by increasing the number of partitions in a topic.

We also announced support for Java 8 (Corretto) in Lambda, and you can now use Amazon Linux 2 for custom runtimes. Amazon Linux 2 is the latest generation of Amazon Linux and provides an application environment with access to the latest innovations in the Linux ecosystem.

Amazon API Gateway

API integrations

API Gateway continued to launch new features for HTTP APIs, including new integrations for five AWS services. HTTP APIs can now route requests to AWS AppConfig, Amazon EventBridge, Amazon Kinesis Data Streams, Amazon SQS, and AWS Step Functions. This makes it easy to create webhooks for business logic hosted in these services. The service also expanded the authorization capabilities, adding Lambda and IAM authorizers, and enabled wildcards in custom domain names. Over time, we will continue to improve and migrate features from REST APIs to HTTP APIs.

In September, we launched mutual TLS for both regional REST APIs and HTTP APIs. This is a new method for client-to-server authentication to enhance the security of your API. It can protect your data from exploits such as client spoofing or man-in-the-middle. This enforces two-way TLS (or mTLS) which enables certificate-based authentication both ways from client-to-server and server-to-client.

Enhanced observability variables now make it easier to troubleshoot each phase of an API request. Each phase from AWS WAF through to integration adds latency to a request, returns a status code, or raises an error. Developers can use these variables to identify the cause of latency within the API request. You can configure these variables in AWS SAM templates – see the demo application to see how you can use these variables in your own application.

AWS Step Functions

X-Ray tracing in Step Functions

We added X-Ray tracing support for Step Functions workflows, giving you full visibility across state machine executions, making it easier to analyze and debug distributed applications. Using the service map view, you can visually identify errors in resources and view error rates across workflow executions. You can then drill into the root cause of an error. You can enable X-Ray in existing workflows by a single-click in the console. Additionally, you can now also visualize Step Functions workflows directly in the Lambda console. To see this new feature, open the Step Functions state machines page in the Lambda console.

Step Functions also increased the payload size to 256 KB and added support for string manipulation, new comparison operators, and improved output processing. These updates were made to the Amazon States Languages (ASL), which is a JSON-based language for defining state machines. The new operators include comparison operators, detecting the existence of a field, wildcarding, and comparing two input fields.

AWS Serverless Application Model (AWS SAM)

AWS SAM goes GA

AWS SAM is an open source framework for building serverless applications that converts a shorthand syntax into CloudFormation resources.

In July, the AWS SAM CLI became generally available (GA). This tool operates on SAM templates and provides developers with local tooling for building serverless applications. The AWS SAM CLI offers a rich set of tools that enable developers to build serverless applications quickly.

AWS X-Ray

X-Ray Insights

X-Ray launched a public preview of X-Ray Insights, which can help produce actionable insights for anomalies within your applications. Designed to make it easier to analyze and debug distributed applications, it can proactively identify issues caused by increases in faults. Using the incident timeline, you can visualize when the issue started and how it developed. The service identifies a probable root cause along with any anomalous services. There is no additional instrumentation needed to use X-Ray Insights – you can enable this feature within X-Ray Groups.

Amazon Kinesis

In July, Kinesis announced support for data delivery to generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. Use the Amazon Kinesis console to configure your data producers to send data to Amazon Kinesis Data Firehose and specify one of these new delivery targets. Additionally, Amazon Kinesis Data Firehose is now available in the Europe (Milan) and Africa (Cape Town) AWS Regions.

Serverless Posts

Our team is always working to build and write content to help our customers better understand all our serverless offerings. Here is a list of the latest posts published to the AWS Compute Blog this quarter.

July

August

September

Tech Talks & Events

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the globe, regularly join in on podcasts, and record short videos you can find to learn in quick byte sized chunks.

Here are some from Q3:

Learning Paths

Ask Around Me

Learn How to Build and Deploy a Web App Backend that Supports Authentication, Geohashing, and Real-Time Messaging

Ask Around Me is an example web app that shows how to build authenticaton, geohashing and real-time messaging into your serverless applications. This learning path includes videos and learning resources to help walk you through the application.

Build a Serverless Web App for a Theme Park

This five-video learning path walks you through the Innovator Island workshop, and provides learning resources for building realtime serverless web applications.

Live streams

July

August

September

There are also a number of other helpful video series covering serverless available on the Serverless Land YouTube channel.

New AWS Serverless Heroes

Serverless Heroes Q3 2020

We’re pleased to welcome Angela Timofte, Luca Bianchi, Matthieu Napoli, Peter Hanssens, Sheen Brisals, and Tom McLaughlin to the growing list of AWS Serverless Heroes.

The AWS Hero program is a selection of worldwide experts that have been recognized for their positive impact within the community. They share helpful knowledge and organize events and user groups. They’re also contributors to numerous open-source projects in and around serverless technologies.

New! The Serverless Land website

Serverless Land

To help developers find serverless learning resources, we have curated a list of serverless blogs, videos, events and training programs at a new site, Serverless Land. This is regularly updated with new information – you can subscribe to the RSS feed for automatic updates, follow the LinkedIn page or subscribe to the YouTube channel.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow all of us on Twitter to see the latest news, follow conversations, and interact with the team.

How to delete user data in an AWS data lake

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/how-to-delete-user-data-in-an-aws-data-lake/

General Data Protection Regulation (GDPR) is an important aspect of today’s technology world, and processing data in compliance with GDPR is a necessity for those who implement solutions within the AWS public cloud. One article of GDPR is the “right to erasure” or “right to be forgotten” which may require you to implement a solution to delete specific users’ personal data.

In the context of the AWS big data and analytics ecosystem, every architecture, regardless of the problem it targets, uses Amazon Simple Storage Service (Amazon S3) as the core storage service. Despite its versatility and feature completeness, Amazon S3 doesn’t come with an out-of-the-box way to map a user identifier to S3 keys of objects that contain user’s data.

This post walks you through a framework that helps you purge individual user data within your organization’s AWS hosted data lake, and an analytics solution that uses different AWS storage layers, along with sample code targeting Amazon S3.

Reference architecture

To address the challenge of implementing a data purge framework, we reduced the problem to the straightforward use case of deleting a user’s data from a platform that uses AWS for its data pipeline. The following diagram illustrates this use case.

We’re introducing the idea of building and maintaining an index metastore that keeps track of the location of each user’s records and allows us locate to them efficiently, reducing the search space.

You can use the following architecture diagram to delete a specific user’s data within your organization’s AWS data lake.

For this initial version, we created three user flows that map each task to a fitting AWS service:

Flow 1: Real-time metastore update

The S3 ObjectCreated or ObjectDelete events trigger an AWS Lambda function that parses the object and performs an add/update/delete operation to keep the metadata index up to date. You can implement a simple workflow for any other storage layer, such as Amazon Relational Database Service (RDS), Amazon Aurora, or Amazon Elasticsearch Service (ES). We use Amazon DynamoDB and Amazon RDS for PostgreSQL as the index metadata storage options, but our approach is flexible to any other technology.

Flow 2: Purge data

When a user asks for their data to be deleted, we trigger an AWS Step Functions state machine through Amazon CloudWatch to orchestrate the workflow. Its first step triggers a Lambda function that queries the metadata index to identify the storage layers that contain user records and generates a report that’s saved to an S3 report bucket. A Step Functions activity is created and picked up by a Lambda Node JS based worker that sends an email to the approver through Amazon Simple Email Service (SES) with approve and reject links.

The following diagram shows a graphical representation of the Step Function state machine as seen on the AWS Management Console.

The approver selects one of the two links, which then calls an Amazon API Gateway endpoint that invokes Step Functions to resume the workflow. If you choose the approve link, Step Functions triggers a Lambda function that takes the report stored in the bucket as input, deletes the objects or records from the storage layer, and updates the index metastore. When the purging job is complete, Amazon Simple Notification Service (SNS) sends a success or fail email to the user.

The following diagram represents the Step Functions flow on the console if the purge flow completed successfully.

For the complete code base, see step-function-definition.json in the GitHub repo.

Flow 3: Batch metastore update

This flow refers to the use case of an existing data lake for which index metastore needs to be created. You can orchestrate the flow through AWS Step Functions, which takes historical data as input and updates metastore through a batch job. Our current implementation doesn’t include a sample script for this user flow.

Our framework

We now walk you through the two use cases we followed for our implementation:

  • You have multiple user records stored in each Amazon S3 file
  • A user has records stored in homogenous AWS storage layers

Within these two approaches, we demonstrate alternatives that you can use to store your index metastore.

Indexing by S3 URI and row number

For this use case, we use a free tier RDS Postgres instance to store our index. We created a simple table with the following code:

CREATE UNLOGGED TABLE IF NOT EXISTS user_objects (
				userid TEXT,
				s3path TEXT,
				recordline INTEGER
			);

You can index on user_id to optimize query performance. On object upload, for each row, you need to insert into the user_objects table a row that indicates the user ID, the URI of the target Amazon S3 object, and the row that corresponds to the record. For instance, when uploading the following JSON input, enter the following code:

{"user_id":"V34qejxNsCbcgD8C0HVk-Q","body":"…"}
{"user_id":"ofKDkJKXSKZXu5xJNGiiBQ","body":"…"}
{"user_id":"UgMW8bLE0QMJDCkQ1Ax5Mg","body ":"…"}

We insert the tuples into user_objects in the Amazon S3 location s3://gdpr-demo/year=2018/month=2/day=26/input.json. See the following code:

(“V34qejxNsCbcgD8C0HVk-Q”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 0)
(“ofKDkJKXSKZXu5xJNGiiBQ”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 1)
(“UgMW8bLE0QMJDCkQ1Ax5Mg”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 2)

You can implement the index update operation by using a Lambda function triggered on any Amazon S3 ObjectCreated event.

When we get a delete request from a user, we need to query our index to get some information about where we have stored the data to delete. See the following code:

SELECT s3path,
                ARRAY_AGG(recordline)
                FROM user_objects
                WHERE userid = ‘V34qejxNsCbcgD8C0HVk-Q’
                GROUP BY;

The preceding example SQL query returns rows like the following:

(“s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json“, {2102,529})

The output indicates that lines 529 and 2102 of S3 object s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json contain the requested user’s data and need to be purged. We then need to download the object, remove those rows, and overwrite the object. For a Python implementation of the Lambda function that implements this functionality, see deleteUserRecords.py in the GitHub repo.

Having the record line available allows you to perform the deletion efficiently in byte format. For implementation simplicity, we purge the rows by replacing the deleted rows with an empty JSON object. You pay a slight storage overhead, but you don’t need to update subsequent row metadata in your index, which would be costly. To eliminate empty JSON objects, we can implement an offline vacuum and index update process.

Indexing by file name and grouping by index key

For this use case, we created a DynamoDB table to store our index. We chose DynamoDB because of its ease of use and scalability; you can use its on-demand pricing model so you don’t need to guess how many capacity units you might need. When files are uploaded to the data lake, a Lambda function parses the file name (for example, 1001-.csv) to identify the user identifier and populates the DynamoDB metadata table. Userid is the partition key, and each different storage layer has its own attribute. For example, if user 1001 had data in Amazon S3 and Amazon RDS, their records look like the following code:

{"userid:": 1001, "s3":{"s3://path1", "s3://path2"}, "RDS":{"db1.table1.column1"}}

For a sample Python implementation of this functionality, see update-dynamo-metadata.py in the GitHub repo.

On delete request, we query the metastore table, which is DynamoDB, and generate a purge report that contains details on what storage layers contain user records, and storage layer specifics that can speed up locating the records. We store the purge report to Amazon S3. For a sample Lambda function that implements this logic, see generate-purge-report.py in the GitHub repo.

After the purging is approved, we use the report as input to delete the required resources. For a sample Lambda function implementation, see gdpr-purge-data.py in the GitHub repo.

Implementation and technology alternatives

We explored and evaluated multiple implementation options, all of which present tradeoffs, such as implementation simplicity, efficiency, critical data compliance, and feature completeness:

  • Scan every record of the data file to create an index – Whenever a file is uploaded, we iterate through its records and generate tuples (userid, s3Uri, row_number) that are then inserted to our metadata storing layer. On delete request, we fetch the metadata records for requested user IDs, download the corresponding S3 objects, perform the delete in place, and re-upload the updated objects, overwriting the existing object. This is the most flexible approach because it supports a single object to store multiple users’ data, which is a very common practice. The flexibility comes at a cost because it requires downloading and re-uploading the object, which introduces a network bottleneck in delete operations. User activity datasets such as customer product reviews are a good fit for this approach, because it’s unexpected to have multiple records for the same user within each partition (such as a date partition), and it’s preferable to combine multiple users’ activity in a single file. It’s similar to what was described in the section “Indexing by S3 URI and row number” and sample code is available in the GitHub repo.
  • Store metadata as file name prefix – Adding the user ID as the prefix of the uploaded object under the different partitions that are defined based on query pattern enables you to reduce the required search operations on delete request. The metadata handling utility finds the user ID from the file name and maintains the index accordingly. This approach is efficient in locating the resources to purge but assumes a single user per object, and requires you to store user IDs within the filename, which might require InfoSec considerations. Clickstream data, where you would expect to have multiple click events for a single customer on a single date partition during a session, is a good fit. We covered this approach in the section “Indexing by file name and grouping by index key” and you can download the codebase from the GitHub repo.
  • Use a metadata file – Along with uploading a new object, we also upload a metadata file that’s picked up by an indexing utility to create and maintain the index up to date. On delete request, we query the index, which points us to the records to purge. A good fit for this approach is a use case that already involves uploading a metadata file whenever a new object is uploaded, such as uploading multimedia data, along with their metadata. Otherwise, uploading a metadata file on every object upload might introduce too much of an overhead.
  • Use the tagging feature of AWS services – Whenever a new file is uploaded to Amazon S3, we use the Put Object Tagging Amazon S3 operation to add a key-value pair for the user identifier. Whenever there is a user data delete request, it fetches objects with that tag and deletes them. This option is straightforward to implement using the existing Amazon S3 API and can therefore be a very initial version of your implementation. However, it involves significant limitations. It assumes a 1:1 cardinality between Amazon S3 objects and users (each object only contains data for a single user), searching objects based on a tag is limited and inefficient, and storing user identifiers as tags might not be compliant with your organization’s InfoSec policy.
  • Use Apache Hudi – Apache Hudi is becoming a very popular option to perform record-level data deletion on Amazon S3. Its current version is restricted to Amazon EMR, and you can use it if you start to build your data lake from scratch, because you need to store your as Hudi datasets. Hudi is a very active project and additional features and integrations with more AWS services are expected.

The key implementation decision of our approach is separating the storage layer we use for our data and the one we use for our metadata. As a result, our design is versatile and can be plugged in any existing data pipeline. Similar to deciding what storage layer to use for your data, there are many factors to consider when deciding how to store your index:

  • Concurrency of requests – If you don’t expect too many simultaneous inserts, even something as simple as Amazon S3 could be a starting point for your index. However, if you get multiple concurrent writes for multiple users, you need to look into a service that copes better with transactions.
  • Existing team knowledge and infrastructure – In this post, we demonstrated using DynamoDB and RDS Postgres for storing and querying the metadata index. If your team has no experience with either of those but are comfortable with Amazon ES, Amazon DocumentDB (with MongoDB compatibility), or any other storage layer, use those. Furthermore, if you’re already running (and paying for) a MySQL database that’s not used to capacity, you could use that for your index for no additional cost.
  • Size of index – The volume of your metadata is orders of magnitude lower than your actual data. However, if your dataset grows significantly, you might need to consider going for a scalable, distributed storage solution rather than, for instance, a relational database management system.

Conclusion

GDPR has transformed best practices and introduced several extra technical challenges in designing and implementing a data lake. The reference architecture and scripts in this post may help you delete data in a manner that’s compliant with GDPR.

Let us know your feedback in the comments and how you implemented this solution in your organization, so that others can learn from it.

 


About the Authors

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

 

 

 

 

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.

Introducing AWS X-Ray new integration with AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/introducing-aws-x-ray-new-integration-with-aws-step-functions/

AWS Step Functions now integrates with AWS X-Ray to provide a comprehensive tracing experience for serverless orchestration workflows.

Step Functions allows you to build resilient serverless orchestration workflows with AWS services such as AWS Lambda, Amazon SNS, Amazon DynamoDB, and more. Step Functions provides a history of executions for a given state machine in the AWS Management Console or with Amazon CloudWatch Logs.

AWS X-Ray is a distributed tracing system that helps developers analyze and debug their applications. It traces requests as they travel through the individual services and resources that make up an application. This provides an end-to-end view of how an application is performing.

What is new?

The new Step Functions integration with X-Ray provides an additional workflow monitoring experience. Developers can now view maps and timelines of the underlying components that make up a Step Functions workflow. This helps to discover performance issues, detect permission problems, and track requests made to and from other AWS services.

The Step Functions integration with X-Ray can be analyzed in three constructs:

Service map: The service map view shows information about a Step Functions workflow and all of its downstream services. This enables developers to identify services where errors are occurring, connections with high latency, or traces for requests that are unsuccessful among the large set of services within their account. The service map aggregates data from specific time intervals from one minute through six hours and has a 30-day retention.

Trace map view: The trace map view shows in-depth information from a single trace as it moves through each service. Resources are listed in the order in which they are invoked.

Trace timeline: The trace timeline view shows the propagation of a trace through the workflow and is paired with a time scale called a latency distribution histogram. This shows how long it takes for a service to complete its requests. The trace is composed of segments and sub-segments. A segment represents the Step Functions execution. Subsegments each represent a state transition.

Getting Started

X-Ray tracing is enabled using AWS Serverless Application Model (AWS SAM), AWS CloudFormation or from within the AWS Management Console. To get started with Step Functions and X-Ray using the AWS Management Console:

  1. Go to the Step Functions page of the AWS Management Console.
  2. Choose Get Started, review the Hello World example, then choose Next.
  3. Check Enable X-Ray tracing from the Tracing section.

Workflow visibility

The following Step Functions workflow example is invoked via Amazon EventBridge when a new file is uploaded to an Amazon S3 bucket. The workflow uses Amazon Textract to detect text from an image file. It translates the text into multiple languages using Amazon Translate and saves the results into an Amazon DynamoDB table. X-Ray has been enabled for this workflow.

To view the X-Ray service map for this workflow, I choose the X-Ray trace map link at the top of the Step Functions Execution details page:

The service map is generated from trace data sent through the workflow. Toggling the Service Icons displays each individual service in this workload. The size of each node is weighted by traffic or health, depending on the selection.

This shows the error percentage and average response times for each downstream service. T/min is the number of traces sent per minute in the selected time range. The following map shows a 67% error rate for the Step Functions workflow.

Accelerated troubleshooting

By drilling down through the service map, to the individual trace map, I quickly pinpoint the error in this workflow. I choose the Step Functions service from the trace map. This opens the service details panel. I then choose View traces. The trace data shows that from a group of nine responses, 3 completed successfully and 6 completed with error. This correlates with the response times listed for each individual trace. Three traces complete in over 5 seconds, while 6 took less than 3 seconds.

Choosing one of the faster traces opens the trace timeline map. This illustrates the aggregate response time for the workflow and each of its states. It shows a state named Read text from image invoked by a Lambda Function. This takes 2.3 seconds of the workflow’s total 2.9 seconds to complete.

A warning icon indicates that an error has occurred in this Lambda function. Hovering the curser over the icon, reveals that the property “Blocks” is undefined. This shows that an error occurred within the Lambda function (no text was found within the image). The Lambda function did not have sufficient error handling to manage this error gracefully, so the workflow exited.

Here’s how that same state execution failure looks in the Step Functions Graph inspector.

Performance profiling

The visualizations provided in the service map are useful for estimating the average latency in a workflow, but issues are often indicated by statistical outliers. To help investigate these, the Response distribution graph shows a distribution of latencies for each state within a workflow, and its downstream services.

Latency is the amount of time between when a request starts and when it completes. It shows duration on the x-axis, and the percentage of requests that match each duration on the y-axis. Additional filters are applied to find traces by duration or status code. This helps to discover patterns and to identify specific cases and clients with issues at a given percentile.

Sampling

X-Ray applies a sampling algorithm to determine which requests to trace. A sampling rate of 100% is used for state machines with an execution rate of less than one per second. State machines running at a rate greater than one execution per second default to a 5% sampling rate. Configure the sampling rate to determine what percentage of traces to sample. Enable trace sampling with the AWS Command Line Interface (AWS CLI) using the CreateStateMachine and UpdateStateMachine APIs with the enable-Trace-Sampling attribute:

--enable-trace-sampling true

It can also be configured in the AWS Management Console.

Trace data retention and limits

X-Ray retains tracing data for up to 30 days with a single trace holding up to 7 days of execution data. The current minimum guaranteed trace size is 100Kb, which equates to approximately 80 state transitions.   The actual number of state transitions supported will depend on the upstream and downstream calls and duration of the workflow. When the trace size limit is reached, the trace cannot be updated with new segments or updates to existing segments. The traces that have reached the limit are indicated with a banner in the X-Ray console.

For a full service comparison of X-Ray trace data and Step Functions execution history, please refer to the documentation.

Conclusion

The Step Functions integration with X-Ray provides a single monitoring dashboard for workflows running at scale. It provides a high-level system overview of all workflow resources and the ability to drill down to view detailed timelines of workflow executions. You can now use the orchestration capabilities of Step Functions with the tracing, visualization, and debug capabilities of AWS X-Ray.

This enables developers to reduce problem resolution times by visually identifying errors in resources and viewing error rates across workflow executions. You can profile and improve application performance by identifying outliers while analyzing and debugging high latency and jitter in workflow executions.

This feature is available in all Regions where both AWS Step Functions and AWS X-Ray are available. View the AWS Regions table to learn more. For pricing information, see AWS X-Ray pricing.

To learn more about Step Functions, read the Developer Guide. For more serverless learning resources, visit https://serverlessland.com.

Introducing larger state payloads for AWS Step Functions

Post Syndicated from Rob Sutter original https://aws.amazon.com/blogs/compute/introducing-larger-state-payloads-for-aws-step-functions/

AWS Step Functions allows you to create serverless workflows that orchestrate your business processes. Step Functions stores data from workflow invocations as application state. Today we are increasing the size limit of application state from 32,768 characters to 256 kilobytes of data per workflow invocation. The new limit matches payload limits for other commonly used serverless services such as Amazon SNS, Amazon SQS, and Amazon EventBridge. This means you no longer need to manage Step Functions payload limitations as a special case in your serverless applications.

Faster, cheaper, simpler state management

Previously, customers worked around limits on payload size by storing references to data, such as a primary key, in their application state. An AWS Lambda function then loaded the data via an SDK call at runtime when the data was needed. With larger payloads, you now can store complete objects directly in your workflow state. This removes the need to persist and load data from data stores such as Amazon DynamoDB and Amazon S3. You do not pay for payload size, so storing data directly in your workflow may reduce both cost and execution time of your workflows and Lambda functions. Storing data in your workflow state also reduces the amount of code you need to write and maintain.

AWS Management Console and workflow history improvements

Larger state payloads mean more data to visualize and search. To help you understand that data, we are also introducing changes to the AWS Management Console for Step Functions. We have improved load time for the Execution History page to help you get the information you need more quickly. We have also made backwards-compatible changes to the GetExecutionHistory API call. Now if you set includeExecutionData to false, GetExecutionHistory excludes payload data and returns only metadata. This allows you to debug your workflows more quickly.

Doing more with dynamic parallelism

A larger payload also allows your workflows to process more information. Step Functions workflows can process an arbitrary number of tasks concurrently using dynamic parallelism via the Map State. Dynamic parallelism enables you to iterate over a collection of related items applying the same process to each item. This is an implementation of the map procedure in the MapReduce programming model.

When to choose dynamic parallelism

Choose dynamic parallelism when performing operations on a small collection of items generated in a preliminary step. You define an Iterator, which operates on these items individually. Optionally, you can reduce the results to an aggregate item. Unlike with parallel invocations, each item in the collection is related to the other items. This means that an error in processing one item typically impacts the outcome of the entire workflow.

Example use case

Ecommerce and line of business applications offer many examples where dynamic parallelism is the right approach. Consider an order fulfillment system that receives an order and attempts to authorize payment. Once payment is authorized, it attempts to lock each item in the order for shipment. The available items are processed and their total is taken from the payment authorization. The unavailable items are marked as pending for later processing.

The following Amazon States Language (ASL) defines a Map State with a simplified Iterator that implements the order fulfillment steps described previously.


    "Map": {
      "Type": "Map",
      "ItemsPath": "$.orderItems",
      "ResultPath": "$.packedItems",
      "MaxConcurrency": 40,
      "Next": "Print Label",
      "Iterator": {
        "StartAt": "Lock Item",
        "States": {
          "Lock Item": {
            "Type": "Pass",
            "Result": "Item locked!",
            "Next": "Pull Item"
          },
          "Pull Item": {
            "Type": "Pass",
            "Result": "Item pulled!",
            "Next": "Pack Item"
          },
          "Pack Item": {
            "Type": "Pass",
            "Result": "Item packed!",
            "End": true
          }
        }
      }
    }

The following image provides a visualization of this workflow. A preliminary state retrieves the collection of items from a data store and loads it into the state under the orderItems key. The triple dashed lines represent the Map State which attempts to lock, pull, and pack each item individually. The result of processing each individual item impacts the next state, Print Label. As more items are pulled and packed, the total weight increases. If an item is out of stock, the total weight will decrease.

A visualization of a portion of an AWS Step Functions workflow that implements dynamic parallelism

Dynamic parallelism or the “Map State”

Larger state payload improvements

Without larger state payloads, each item in the $.orderItems object in the workflow state would be a primary key to a specific item in a DynamoDB table. Each step in the “Lock, Pull, Pack” workflow would need to read data from DynamoDB for every item in the order to access detailed item properties.

With larger state payloads, each item in the $.orderItems object can be a complete object containing the required fields for the relevant items. Not only is this faster, resulting in a better user experience, but it also makes debugging workflows easier.

Pricing and availability

Larger state payloads are available now in all commercial and AWS GovCloud (US) Regions where Step Functions is available. No changes to your workflows are required to use larger payloads, and your existing workflows will continue to run as before. The larger state is available however you invoke your Step Functions workflows, including the AWS CLI, the AWS SDKs, the AWS Step Functions Data Science SDK, and Step Functions Local.

Larger state payloads are included in existing Step Functions pricing for Standard Workflows. Because Express Workflows are priced by runtime and memory, you may see more cost on individual workflows with larger payloads. However, this increase may also be offset by the reduced cost of Lambda, DynamoDB, S3, or other AWS services.

Conclusion

Larger Step Functions payloads simplify and increase the efficiency of your workflows by eliminating function calls to persist and retrieve data. Larger payloads also allow your workflows to process more data concurrently using dynamic parallelism.

With larger payloads, you can minimize the amount of custom code you write and focus on the business logic of your workflows. Get started building serverless workflows today!

Using serverless backends to iterate quickly on web apps – part 3

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-serverless-backends-to-iterate-quickly-on-web-apps-part-3/

This series is about building flexible backends for web applications. The example is Happy Path, a web app that allows park visitors to upload and share maps and photos, to help replace printed materials.

In part 1, I show how the application works and walk through the backend architecture. In part 2, you deploy a basic image-processing workflow. To do this, you use an AWS Serverless Application Model (AWS SAM) template to create an AWS Step Functions state machine.

In this post, I show how to deploy progressively more complex workflow functionality while using minimal code. This solution in this web app is designed for 100,000 monthly active users. Using this approach, you can introduce more sophisticated workflows without affecting the existing backend application.

The code and instructions for this application are available in the GitHub repo.

Introducing image moderation

In the first version, users could upload any images to parks on the map. One of the most important feature requests is to enable moderation, to prevent inappropriate images from appearing on the app. To handle a large number of uploaded images, using human moderation would be slow and labor-intensive.

In this section, you use Amazon ML services to automate analyzing the images for unsafe content. Amazon Rekognition provides an API to detect if an image contains moderation labels. These labels are categorized into different types of unsafe content that would not be appropriate for this app.

Version 2 of the workflow uses this API to automate the process of checking images. To install version 2:

  1. From a terminal window, delete the v1 workflow stack:
    aws cloudformation delete-stack --stack-name happy-path-workflow-v1
  2. Change directory to the version 2 AWS SAM template in the repo:
    cd .\workflows\templates\v2
  3. Build and deploy the solution:
    sam build
    sam deploy --guided
  4. The deploy process prompts you for several parameters. Enter happy-path-workflow-v2 as the Stack Name. The other values are the outputs from the backend deployment process, detailed in the repo’s README. Enter these to complete the deployment.

From VS Code, open the v2 state machine in the repo from workflows/statemachines/v2.asl.json. Choose the Render graph option in the CodeLens to see the workflow visualization.

Serverless workflow visualization

This new workflow introduces a Moderator step. This invokes a Moderator Lambda function that uses the Amazon Rekognition API. If this API identifies any unsafe content labels, it returns these as part of the function output.

The next step in the workflow is a Moderation result choice state. This evaluates the output of the previous function – if the image passes moderation, the process continues to the Resizer function. If it fails, execution moves to the RecordFailState step.

Step Functions integrates directly with some AWS services so that you can call and pass parameters into the APIs of those services. The RecordFailState uses an Amazon DynamoDB service integration to write the workflow failure to the application table, using the arn:aws:states:::dynamodb:updateItem resource.

Testing the workflow

To test moderation, I use an unsafe image with suggestive content. This is an image that is not considered appropriate for this application. To test the deployed v2 workflow:

  1. Open the frontend application at https://localhost:8080 in your browser.
  2. Select a park location, choose Show Details, and then choose Upload images.
  3. Select an unsafe image to upload.
  4. Navigate to the Step Functions console. This shows the v2StateMachine with one successful execution:State machine result
  5. Select the state machine, and choose the execution to display more information. Scroll down to the Visual workflow panel.Visual workflow panel

This shows that the moderation failed and the path continued to log the failed state in the database. If you open the Output resource, this displays more details about why the image is considered unsafe.

Checking the image size and file type

The upload component in the frontend application limits file selection to JPG images but there is no check to reject images that are too small. It’s prudent to check and enforce image types and sizes on the backend API in addition to the frontend. This is because it’s possible to upload images via the API without using the frontend.

The next version of the workflow enforces image sizes and file types. To install version 3:

  1. From a terminal window, delete the v2 workflow stack:
    aws cloudformation delete-stack --stack-name happy-path-workflow-v2
  2. Change directory to the version 3 AWS SAM template in the repo:
    cd .\workflows\templates\v3
  3. Build and deploy the solution:
    sam build
    sam deploy --guided
  4. The deploy process prompts you for several parameters. Enter happy-path-workflow-v3 as the Stack Name. The other values are the outputs from the backend deployment process, detailed in the repo’s README. Enter these to complete the deployment.

From VS Code, open the v3 state machine in the repo from workflows/statemachines/v3.asl.json. Choose the Render graph option in the CodeLens to see the workflow visualization.

v3 state machine

This workflow changes the starting point of the execution, introducing a Check Dimensions step. This invokes a Lambda function that checks the size and types of the Amazon S3 object using the image-size npm package. This function uses environment variables provided by the AWS SAM template to compare against a minimum size and allowed type array.

The output is evaluated by the Dimension Result choice state. If the image is larger than the minimum size allowed, execution continues to the Moderator function as before. If not, it passes to the RecordFailState step to log the result in the database.

Testing the workflow

To test, I use an image that’s narrower than the mixPixels value. To test the deployed v3 workflow:

  1. Open the frontend application at https://localhost:8080 in your browser.
  2. Select a park location, choose Show Details, and then choose Upload images.
  3. Select an image with a width smaller than 800 pixels. After a few seconds, a rejection message appears:"Image is too small" message
  4. Navigate to the Step Functions console. This shows the v3StateMachine with one successful execution. Choose the execution to show more detail.Execution output

The execution shows that the Check Dimension step added the image dimensions to the event object. Next, the Dimensions Result choice state rejected the image, and logged the result at the RecordFailState step. The application’s DynamoDB table now contains details about the failed upload:

DynamoDB item details

Pivoting the application to a new concept

Until this point, the Happy Path web application is designed to help park visitors share maps and photos. This is the development team’s original idea behind the app. During the product-market fit stage of development, it’s common for applications to pivot substantially from the original idea. For startups, it can be critical to have the agility to modify solutions quickly to meet the needs of customers.

In this scenario, the original idea has been less successful than predicted, and park visitors are not adopting the app as expected. However, the business development team has identified a new opportunity. Restaurants would like an app that allows customers to upload menus and food photos. How can the development team create a new proof-of-concept app for restaurant customers to test this idea?

In this version, you modify the application to work for restaurants. While features continue to be added to the parks workflow, it now supports business logic specifically for the restaurant app.

To create the v4 workflow and update the frontend:

  1. From a terminal window, delete the v3 workflow stack:
    aws cloudformation delete-stack --stack-name happy-path-workflow-v3
  2. Change directory to the version 4 AWS SAM template in the repo:
    cd .\workflows\templates\v4
  3. Build and deploy the solution:
    sam build
    sam deploy --guided
  4. The deploy process prompts you for several parameters. Enter happy-path-workflow-v4 as the Stack Name. The other values are the outputs from the backend deployment process, detailed in the repo’s README. Enter these to complete the deployment.
  5. Open frontend/src/main.js and update the businessType variable on line 63. Set this value to ‘restaurant’.Change config to restaurants
  6. Start the local development server:
    npm run serve
  7. Open the application at http://localhost:8080. This now shows restaurants in the local area instead of parks.

In the Step Functions console, select the v4StateMachine to see the latest workflow, then open the Definition tab to see the visualization:

Workflow definition

This workflow starts with steps that apply to both parks and restaurants – checking the image dimensions. Next, it determines the place type from the placeId record in DynamoDB. Depending on place type, it now follows a different execution path:

  • Parks continue to run the automated moderator process, then resizer and publish the result.
  • Restaurants now use Amazon Rekognition to determine the labels in the image. Any photos containing people are rejected. Next, the workflow continues to the resizer and publish process.
  • Other business types go to the RecordFailState step since they are not supported.

Testing the workflow

To test the deployed v4 workflow:

  1. Open the frontend application at https://localhost:8080 in your browser.
  2. Select a restaurant, choose Show Details, and then choose Upload images.
  3. Select an image from the test photos dataset. After a few seconds, you see a message confirming the photo has been added.
  4. Next, select an image that contains one or more people. The new restaurant workflow rejects this type of photo:"Image rejected" message
  5. In the Step Functions console, select the last execution for the v4StateMachine to see how the Check for people step rejected the image:v4 workflow

If other business types are added later to the application, you can extend the Step Functions workflow accordingly. The cost of Step Functions is based on the number of transitions in a workflow, not the number of total steps. This means you can branch by business type in the Happy Path application. This doesn’t affect the overall cost of running an execution, if the total transitions are the same per execution.

Conclusion

Previously in this series, you deploy a simple workflow for processing image uploads in the Happy Path web application. In this post, you add progressively more complex functionality by deploying new versions of workflows.

The first iteration introduces image moderation using Amazon Rekognition, providing the ability to automate the evaluation of unsafe content. Next, the workflow is modified to check image size and file type. This allows you to reject any images that are too small or do not meet the type requirements. Finally, I show how to expand the logic further to accept other business types with their own custom workflows.

To learn more about building serverless web applications, see the Ask Around Me series.

Building storage-first serverless applications with HTTP APIs service integrations

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/building-storage-first-applications-with-http-apis-service-integrations/

Over the last year, I have been talking about “storage first” serverless patterns. With these patterns, data is stored persistently before any business logic is applied. The advantage of this pattern is increased application resiliency. By persisting the data before processing, the original data is still available, if or when errors occur.

Common pattern for serverless API backend

Common pattern for serverless API backend

Using Amazon API Gateway as a proxy to an AWS Lambda function is a common pattern in serverless applications. The Lambda function handles the business logic and communicates with other AWS or third-party services to route, modify, or store the processed data. One option is to place the data in an Amazon Simple Queue Service (SQS) queue for processing downstream. In this pattern, the developer is responsible for handling errors and retry logic within the Lambda function code.

The storage first pattern flips this around. It uses native error handling with retry logic or dead-letter queues (DLQ) at the SQS layer before any code is run. By directly integrating API Gateway to SQS, developers can increase application reliability while reducing lines of code.

Storage first pattern for serverless API backend

Storage first pattern for serverless API backend

Previously, direct integrations require REST APIs with transformation templates written in Velocity Template Language (VTL). However, developers tell us they would like to integrate directly with services in a simpler way without using VTL. As a result, HTTP APIs now offers the ability to directly integrate with five AWS services without needing a transformation template or code layer.

The first five service integrations

This release of HTTP APIs direct integrations includes Amazon EventBridge, Amazon Kinesis Data Streams, Simple Queue Service (SQS), AWS System Manager’s AppConfig, and AWS Step Functions. With these new integrations, customers can create APIs and webhooks for their business logic hosted in these AWS services. They can also take advantage of HTTP APIs features like authorizers, throttling, and enhanced observability for securing and monitoring these applications.

Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

The HTTP APIs direct integration for EventBridge uses the PutEvents API to enable client applications to place events on an EventBridge bus. Once the events are on the bus, EventBridge routes the event to specific targets based upon EventBridge filtering rules.

This integration is a storage first pattern because data is written to the bus before any routing or logic is applied. If the downstream target service has issues, then EventBridge implements a retry strategy with incremental back-off for up to 24 hours. Additionally, the integration helps developers reduce code by filtering events at the bus. It routes to downstream targets without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Different tasks are required based upon incoming event details
  • Only data ingestion is required
  • Payload size is less than 256 kb
  • Expected requests per second are less than the Region quotas.

Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

The HTTP APIs direct integration for Kinesis Data Streams offers the PutRecord integration action, enabling client applications to place events on a Kinesis data stream. Kinesis Data Streams are designed to handle up to 1,000 writes per second per shard, with payloads up to 1 mb in size. Developers can increase throughput by increasing the number of shards in the data stream. You can route the incoming data to targets like an Amazon S3 bucket as part of a data lake or a Kinesis data analytics application for real-time analytics.

This integration is a storage first option because data is stored on the stream for up to seven days until it is processed and routed elsewhere. When processing stream events with a Lambda function, errors are handled at the Lambda layer through a configurable error handling strategy.

Use this direct integration when:

  • Ingesting large amounts of data
  • Ingesting large payload sizes
  • Order is important
  • Routing the same data to multiple targets

Amazon SQS

HTTP APIs service integration with Amazon SQS

HTTP APIs service integration with Amazon SQS

The HTTP APIs direct integration for Amazon SQS offers the SendMessage, ReceiveMessage, DeleteMessage, and PurgeQueue integration actions. This integration differs from the EventBridge and Kinesis integrations in that data flows both ways. Events can be created, read, and deleted from the SQS queue via REST calls through the HTTP API endpoint. Additionally, a full purge of the queue can be managed using the PurgeQueue action.

This pattern is a storage first pattern because the data remains on the queue for four days by default (configurable to 14 days), unless it is processed and removed. When the Lambda service polls the queue, the messages that are returned are hidden in the queue for a set amount of time. Once the calling service has processed these messages, it uses the DeleteMessage API to remove the messages permanently.

When triggering a Lambda function with an SQS queue, the Lambda service manages this process internally. However, HTTP APIs direct integration with SQS enables developers to move this process to client applications without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Data must be received as well as sent to the service
  • Downstream services need reduced concurrency
  • The queue requires custom management
  • Order is important (FIFO queues)

AWS AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

The HTTP APIs direct integration for AWS AppConfig offers the GetConfiguration integration action and allows applications to check for application configuration updates. By exposing the systems parameter API through an HTTP APIs endpoint, developers can automate configuration changes for their applications. While this integration is not considered a storage first integration, it does enable direct communication from external services to AppConfig without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Access to AWS AppConfig is required.
  • Managing application configurations.

AWS Step Functions

HTTP APIs service integration with AWS Step Functions

HTTP APIs service integration with AWS Step Functions

The HTTP APIs direct integration for Step Functions offers the StartExecution and StopExecution integration actions. These actions allow for programmatic control of a Step Functions state machine via an API. When starting a Step Functions workflow, JSON data is passed in the request and mapped to the state machine. Error messages are also mapped to the state machine when stopping the execution.

This pattern provides a storage first integration because Step Functions maintains a persistent state during the life of the orchestrated workflow. Step Functions also supports service integrations that allow the workflows to send and receive data without needing a Lambda function as a transport layer.

Use this direct integration when:

  • Orchestrating multiple actions.
  • Order of action is required.

Building HTTP APIs direct integrations

HTTP APIs service integrations can be built using the AWS CLI, AWS SAM, or through the API Gateway console. The console walks through contextual choices to help you understand what is required for each integration. Each of the integrations also includes an Advanced section to provide additional information for the integration.

Creating an HTTP APIs service integration

Creating an HTTP APIs service integration

Once you build an integration, you can export it as an OpenAPI template that can be used with infrastructure as code (IaC) tools like AWS SAM. The exported template can also include the API Gateway extensions that define the specific integration information.

Exporting the HTTP APIs configuration to OpenAPI

Exporting the HTTP APIs configuration to OpenAPI

OpenAPI template

An example of a direct integration from HTTP APIs to SQS is located in the Sessions With SAM repository. This example includes the following architecture:

AWS SAM template resource architecture

AWS SAM template resource architecture

The AWS SAM template creates the HTTP APIs, SQS queue, Lambda function, and both Identity and Access Management (IAM) roles required. This is all generated in 58 lines of code and looks like this:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: HTTP API direct integrations

Resources:
  MyQueue:
    Type: AWS::SQS::Queue
    
  MyHttpApi:
    Type: AWS::Serverless::HttpApi
    Properties:
      DefinitionBody:
        'Fn::Transform':
          Name: 'AWS::Include'
          Parameters:
            Location: './api.yaml'
          
  MyHttpApiRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service: "apigateway.amazonaws.com"
            Action: 
              - "sts:AssumeRole"
      Policies:
        - PolicyName: ApiDirectWriteToSQS
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              Action:
              - sqs:SendMessage
              Effect: Allow
              Resource:
                - !GetAtt MyQueue.Arn
                
  MyTriggeredLambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
      Policies:
        - SQSPollerPolicy:
            QueueName: !GetAtt MyQueue.QueueName
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt MyQueue.Arn

Outputs:
  ApiEndpoint:
    Description: "HTTP API endpoint URL"
    Value: !Sub "https://${MyHttpApi}.execute-api.${AWS::Region}.amazonaws.com"

The OpenAPI template handles the route definitions for the HTTP API configuration and configures the service integration. The template looks like this:

openapi: "3.0.1"
info:
  title: "my-sqs-api"
paths:
  /:
    post:
      responses:
        default:
          description: "Default response for POST /"
      x-amazon-apigateway-integration:
        integrationSubtype: "SQS-SendMessage"
        credentials:
          Fn::GetAtt: [MyHttpApiRole, Arn]
        requestParameters:
          MessageBody: "$request.body.MessageBody"
          QueueUrl:
            Ref: MyQueue
        payloadFormatVersion: "1.0"
        type: "aws_proxy”
        connectionType: "INTERNET"
x-amazon-apigateway-importexport-version: "1.0"

Because the OpenAPI template is included in the AWS SAM template via a transform, the API Gateway integration can reference the roles and services created within the AWS SAM template.

Conclusion

This post covers the concept of storage first integration patterns and how the new HTTP APIs direct integrations can help. I cover the five current integrations and possible use cases for each. Additionally, I demonstrate how to use AWS SAM to build and manage the integrated applications using infrastructure as code.

Using the storage first pattern with direct integrations can help developers build serverless applications that are more durable with fewer lines of code. A Lambda function is no longer required to transport data from the API endpoint to the desired service. Instead, use Lambda function invocations for differentiating business logic.

To learn more join us for the HTTP API service integrations session of Sessions With SAM! 

#ServerlessForEveryone

Fundbox: Simplifying Ways to Query and Analyze Data by Different Personas

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/fundbox-simplifying-ways-to-query-and-analyze-data-by-different-personas/

Fundbox is a leading technology platform focused on disrupting the $21 trillion B2B commerce market by building the world’s first B2B payment and credit network. With Fundbox, sellers of all sizes can quickly increase average order volumes (AOV) and improve close rates by offering more competitive net terms and payment plans to their SMB buyers. With heavy investments in machine learning and the ability to quickly analyze the transactional data of SMB’s, Fundbox is reimagining B2B payments and credit products in new category-defining ways.

Learn how how the company simplified the way different personas in the organization query and analyze data by building a self-service data orchestration platform. The platform architecture is entirely serverless, which simplifies the ability to scale and adopt to unpredictable demand. The platform was built using AWS Step Functions, AWS Lambda, Amazon API Gateway, Amazon DynamoDB, AWS Fargate, and other AWS Serverless managed services.

For more content like this, subscribe to our YouTube channels This is My Architecture, This is My Code, and This is My Model, or visit the This is My Architecture on AWS, which has search functionality and the ability to filter by industry, language, and service.