Tag Archives: Amazon DynamoDB

Build a data lake using Amazon Kinesis Data Streams for Amazon DynamoDB and Apache Hudi

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-data-lake-using-amazon-kinesis-data-streams-for-amazon-dynamodb-and-apache-hudi/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and online order transaction data to develop customer order fulfillment applications, improve customer satisfaction, and get insights into sales revenue to create a promotional offer for the customer. It’s essential to store these data points in a centralized data lake, which can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in order management is receiving, tracking, and fulfilling customer orders. The order management process begins when an order is placed and ends when the customer receives their package. When storing high-velocity order transaction data in DynamoDB, you can use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3).

Amazon Kinesis Data Streams for DynamoDB helps you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service (Amazon ES), Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other Kinesis Data Stream by simply enabling Kinesis streaming connection from Amazon DynamoDB console. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, Kinesis Data Analytics for Apache Flink, and Kinesis Data Firehose. In this post, you use Kinesis Data Firehose to save the raw data in the S3 data lake and Apache Hudi to batch process the data.

Architecture

The following diagram illustrates the order processing system architecture.

In this architecture, users buy products in online retail shops and internally create an order transaction stored in DynamoDB. The order transaction data is ingested to the data lake and stored in the raw data layer. To achieve this, you enable Kinesis Data Streams for DynamoDB and use Kinesis Data Firehose to store data in Amazon S3. You use Lambda to transform the data from the delivery stream to remove unwanted data and finally store it in Parquet format. Next, you batch process the raw data and store it back in the Hudi dataset in the S3 data lake. You can then use Amazon Athena to do sales analysis. You build this entire data pipeline in a serverless manner.

Prerequisites

Complete the following steps to create AWS resources to build a data pipeline as mentioned in the architecture. For this post, we use the AWS Region us-west-1.

  1. On the Amazon Elastic Compute Cloud (Amazon EC2) console, create a keypair.
  2. Download the data files, Amazon EMR cluster, and Athena DDL code from GitHub.
  3. Deploy the necessary Amazon resources using the provided AWS CloudFormation template.
  4. For Stack name, enter a stack name of your choice.
  5. For Keypair name, choose a key pair.

A key pair is required to connect to the EMR cluster nodes. For more information, see Use an Amazon EC2 Key Pair for SSH Credentials.

  1. Keep the remaining default parameters.
  2. Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources.

For more information about IAM, see Resources to learn more about IAM.

  1. Choose Create stack.

You can check the Resources tab for the stack after the stack is created.

The following table summarizes the resources that you created, which you use to build the data pipeline and analysis.

Logical ID Physical ID Type
DeliveryPolicy kines-Deli-* AWS::IAM::Policy
DeliveryRole kinesis-hudi-DeliveryRole-* AWS::IAM::Role
Deliverystream kinesis-hudi-Deliverystream-* AWS::KinesisFirehose::DeliveryStream
DynamoDBTable order_transaction_* AWS::DynamoDB::Table
EMRClusterServiceRole kinesis-hudi-EMRClusterServiceRole-* AWS::IAM::Role
EmrInstanceProfile kinesis-hudi-EmrInstanceProfile-* AWS::IAM::InstanceProfile
EmrInstanceRole kinesis-hudi-EmrInstanceRole-* AWS::IAM::Role
GlueDatabase gluedatabase-* AWS::Glue::Database
GlueTable gluetable-* AWS::Glue::Table
InputKinesisStream order-data-stream-* AWS::Kinesis::Stream
InternetGateway igw-* AWS::EC2::InternetGateway
InternetGatewayAttachment kines-Inter-* AWS::EC2::VPCGatewayAttachment
MyEmrCluster AWS::EMR::Cluster
ProcessLambdaExecutionRole kinesis-hudi-ProcessLambdaExecutionRole-* AWS::IAM::Role
ProcessLambdaFunction kinesis-hudi-ProcessLambdaFunction-* AWS::Lambda::Function
ProcessedS3Bucket kinesis-hudi-processeds3bucket-* AWS::S3::Bucket
PublicRouteTable AWS::EC2::RouteTable
PublicSubnet1 AWS::EC2::Subnet
PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
PublicSubnet2 AWS::EC2::Subnet
PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
RawS3Bucket kinesis-hudi-raws3bucket-* AWS::S3::Bucket
S3Bucket kinesis-hudi-s3bucket-* AWS::S3::Bucket
SourceS3Bucket kinesis-hudi-sources3bucket-* AWS::S3::Bucket
VPC vpc-* AWS::EC2::VPC

Enable Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so you can send data from DynamoDB to Kinesis data streams. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. On the DynamoDB console, choose the table you created in the CloudFormation stack earlier (it begins with the prefix order_transaction_).
  2. On the Overview tab, choose Manage streaming to Kinesis.
  3. Choose your input stream (it starts with order-data-stream-).
  4. Choose Enable.
  5. Choose Close.
  6. Make sure that stream enabled is set to Yes.

Populate the sales order transaction dataset

To replicate a real-life use case, you need an online retail application. For this post, you upload raw data files in the S3 bucket and use a Lambda function to upload the data in DynamoDB. You can download the order data CSV files from the AWS Sample GitHub repository. Complete the following steps to upload the data in DynamoDB:

  1. On the Amazon S3 console, choose the bucket <stack-name>-sourcess3bucket-*.
  2. Choose Upload.
  3. Choose Add files.
  4. Choose the order_data_09_02_2020.csv and order_data_10_02_2020.csv files.
  5. Choose Upload.
  6. On the Lambda console, choose the function <stack-name>-CsvToDDBLambdaFunction-*.
  7. Choose Test.
  8. For Event template, enter an event name.
  9. Choose Create.
  10. Choose Test.

This runs the Lambda function and loads the CSV file order_data_09_02_2020.csv to the DynamoDB table.

  1. Wait until the message appears that the function ran successfully.

You can now view the data on the DynamoDB console, in the details page for your table.

Because you enabled the Kinesis data stream in the DynamoDB table, it starts streaming the data to Amazon S3. You can check the data by viewing the bucket on the Amazon S3 console. The following screenshot shows that a Parquet file is under the prefix in the bucket.

Use Apache Hudi with Amazon EMR

Now it’s time to process the streaming data using Hudi.

  1. Log in to the Amazon EMR leader node.

You can use the key pair you chose in the security options to SSH into the leader node.

  1. Use the following bash command to start the Spark shell to use it with Apache Hudi:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

The Amazon EMR instance looks like the following screenshot.

  1. You can use the following Scala code to import the order transaction data from the S3 data lake to a Hudi dataset using the copy-on-write storage type. Change inputDataPath as per file path in <stack-name>-raws3bucket-* in your environment, and replace the bucket name in hudiTablePath as <stack-name>- processeds3bucket-*.
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/15/"
val hudiTableName = "order_hudi_cow"
val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath)

For more information about DataSourceWriteOptions, see Work with a Hudi Dataset.

  1. In the Spark shell, you can now count the total number of records in the Apache Hudi dataset:
scala> inputDF.count()
res1: Long = 1000

You can check the processed Apache Hudi dataset in the S3 data lake via the Amazon S3 console. The following screenshot shows the prefix order_hudi_cow is in <stack-name>- processeds3bucket-*.

When navigating into the order_hudi_cow prefix, you can find a list of Hudi datasets that are partitioned using the transaction_date key—one for each date in our dataset.

Let’s analyze the data stored in Amazon S3 using Athena.

Analyze the data with Athena

To analyze your data, complete the following steps:

  1. On the Athena console, create the database order_db using the following command:
create database order_db;

You use this database to create all the Athena tables.

  1. Create your table using the following command (replace the S3 bucket name with <stack-name>- processeds3bucket* created in your environment):
    CREATE EXTERNAL TABLE order_transaction_cow (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `_hoodie_partition_path` string,
      `_hoodie_file_name` string,
      `order_id` string,
      `item_id` string,
      `customer_id` string,
      `product` string,
      `amount` decimal(3,1),
      `currency` string,
      `time_stamp` string
      )
      PARTITIONED BY ( 
      `transaction_date` string)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow'

  2. Add partitions by running the following query on the Athena console:
    ALTER TABLE order_transaction_cow ADD
    PARTITION (transaction_date = '2020-09-02') LOCATION 's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow/2020-09-02/';

  3. Check the total number of records in the Hudi dataset with the following query:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

It should return a single row with a count of 1,000.

Now check the record that you want to update.

 

  1. Run the following query on the Athena console:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The output should look like the following screenshot. Note down the value of product and amount.

Analyze the change data capture

Now let’s test the change data capture (CDC) in streaming. Let’s take an example where the customer changed an existing order. We load the order_data_10_02_2020.csv file, where order_id 3801 has a different product and amount.

To test the CDC feature, complete the following steps:

  1. On the Lambda console, choose the stack <stack-name>-CsvToDDBLambdaFunction-*.
  2. In the Environment variables section, choose Edit.
  3. For key, enter order_data_10_02_2020.csv.
  4. Choose Save.

You can see another prefix has been created in <stack-name>-raws3bucket-*.

  1. In Amazon EMR, run the following code in the Scala shell prompt to update the data (change inputDataPath to the file path in <stack-name>-raws3bucket-* and hudiTablePath to <stack-name>- processeds3bucket-*):
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    
    //Set up various input values as variables
    val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/18/"
    val hudiTableName = "order_hudi_cow"
    val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName
    
    // Set up our Hudi Data Source Options
    val hudiOptions = Map[String,String](
        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
    	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
        HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
        DataSourceWriteOptions.OPERATION_OPT_KEY ->
            DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
        DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
        DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
        DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
        DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
        DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
            classOf[MultiPartKeysValueExtractor].getName)
    
    // Read data from S3 and create a DataFrame with Partition and Record Key
    val inputDF = spark.read.format("parquet").load(inputDataPath)
    
    // Write data into the Hudi dataset
    inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath
    

  2. Run the following query on the Athena console to check for the change to the total number of records as 1,000:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

  3. Run the following query on the Athena console to test for the update:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The following screenshot shows that the product and amount values for the same order are updated.

In a production workload, you can trigger the updates on a schedule or by S3 modification events. A fully automated data lake makes sure your business analysts are always viewing the latest available data.

Clean up the resources

To avoid incurring future charges, follow these steps to remove the example resources:

  1. Delete the resources you created earlier in the pre-requisite section by deleting the stack instances from your stack set, if you created the EMR cluster with the CloudFormation template,.
  2. Stop the cluster via the Amazon EMR console, if you launched the EMR cluster manually.
  3. Empty all the relevant buckets via the Amazon S3 console.

Conclusion

You can build an end-to-end serverless data lake to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. It allows your team to focus on solving business problems by getting useful insights immediately. Application developers have various use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to guide enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

 

 

 

Saurabh Shrivastava is a solutions architect leader and analytics/ML specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on data analytics, AI/ML, and DevOps.

 

Caching data and configuration settings with AWS Lambda extensions

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/caching-data-and-configuration-settings-with-aws-lambda-extensions/

This post is written by Hari Ohm Prasath Rajagopal, Senior Modernization Architect and Vamsi Vikash Ankam, Technical Account Manager

In this post, I show how to build a flexible in-memory AWS Lambda caching layer using Lambda extensions. Lambda functions use REST API calls to access the data and configuration from the cache. This can reduce latency and cost when consuming data from AWS services such as Amazon DynamoDB, AWS Systems Manager Parameter Store, and AWS Secrets Manager.

Applications making frequent API calls to retrieve static data can benefit from a caching layer. This can reduce the function’s latency, particularly for synchronous requests, as the data is retrieved from the cache instead of an external service. The cache can also reduce costs by reducing the number of calls to downstream services.

There are two types of cache to consider in this situation:

Lambda extensions are a new way for tools to integrate more easily into the Lambda execution environment and control and participate in Lambda’s lifecycle. They use the Extensions API, a new HTTP interface, to register for lifecycle events during function initialization, invocation, and shutdown.

They can also use environment variables to add options and tools to the runtime, or use wrapper scripts to customize the runtime startup behavior. The Lambda cache uses Lambda extensions to run as a separate process.

To learn more about how to use extensions with your functions, read “Introducing AWS Lambda extensions”.

Creating a cache using Lambda extensions

To set up the example, visit the GitHub repo, and follow the instructions in the README.md file.

The demo uses AWS Serverless Application Model (AWS SAM) to deploy the infrastructure. The walkthrough requires AWS SAM CLI (minimum version 0.48) and an AWS account.

To install the example:

  1. Create an AWS account if you do not already have one and login.
  2. Clone the repo to your local development machine:
  3. git clone https://github.com/aws-samples/aws-lambda-extensions
    cd aws-lambda-extensions/cache-extension-demo/
  4. If you are not running in a Linux environment, ensure that your build architecture matches the Lambda execution environment by compiling with GOOS=linux and GOARCH=amd64.
  5. GOOS=linux GOARCH=amd64
  6. Build the Go binary extension with the following command:
  7. go build -o bin/extensions/cache-extension-demo main.go
  8. Ensure that the extensions files are executable:
  9. chmod +x bin/extensions/cache-extension-demo
  10. Update the parameters region value in ../example-function/config.yaml with the Region where you are deploying the function.
  11. parameters:
      - region: us-west-2
  12. Build the function dependencies.
  13. cd SAM
    sam build
    AWS SAM build

    AWS SAM build

  14. Deploy the AWS resources specified in the template.yml file:
  15. sam deploy --guided
  16. During the prompts:
  17. Enter a stack name cache-extension-demo.
  18. Enter the same AWS Region specified previously.
  19. Accept the default DatabaseName. You can specify a custom database name, and also update the ../example-function/config.yaml and index.js files with the new database name.
  20. Enter MySecret as the Secrets Manager secret.
  21. Accept the defaults for the remaining questions.
  22. AWS SAM Deploy

    AWS SAM Deploy

    AWS SAM deploys:

    • A DynamoDB table.
    • The Lambda function ExtensionsCache-DatabaseEntry, which puts a sample item into the DynamoDB table.
    • An AWS Systems Manager Parameter Store parameter called CacheExtensions_Parameter1 with a value of MyParameter.
    • An AWS Secrets Manager secret called secret_info with a value of MySecret.
    • A Lambda layer called Cache_Extension_Layer.
    • A Lambda function using Nodejs.12 called ExtensionsCache-SampleFunction. This reads the cached values via the extension from either the DynamoDB table, Parameter Store, or Secrets Manager.
    • IAM permissions

    The cache extension is delivered as a Lambda layer and added to ExtensionsCache-SampleFunction.

    It is written as a self-contained binary in Golang, which makes the extension compatible with all of the supported runtimes. The extension caches the data from DynamoDB, Parameter Store, and Secrets Manager, and then runs a local HTTP endpoint to service the data. The Lambda function retrieves the configuration data from the cache using a local HTTP REST API call.

    Here is the architecture diagram.

    Extensions cache architecture diagram

    Extensions cache architecture diagram

    Once deployed, the extension performs the following steps:

    1. On start-up, the extension reads the config.yaml file, which determines which resources to cache. The file is deployed as part of the Lambda function.
    2. The boolean CACHE_EXTENSION_INIT_STARTUP Lambda environment variable specifies whether to load into cache the items specified in config.yaml. If false, the extension initializes an empty map with the names.
    3. The extension retrieves the required data based on the resources in the config.yaml file. This includes the data from DynamoDB, the configuration from Parameter Store, and the secret from Secrets Manager. The data is stored in memory.
    4. The extension starts a local HTTP server using TCP port 4000, which serves the cache items to the function. The Lambda function accesses the local in-memory cache by invoking the following endpoint: http://localhost:4000/<cachetype>?name=<name>.
    5. If the data is not available in the cache, or has expired, the extension accesses the corresponding AWS service to retrieve the data. It is cached first and then returned to the Lambda function. The CACHE_EXTENSION_TTL Lambda environment variable defines the refresh interval (defined based on Go time format, for example: 30s, 3m, etc.)

    This sequence diagram explains the data flow:

    Extensions cache sequence diagram

    Extensions cache sequence diagram

    Testing the example application

    Once the AWS SAM template is deployed, navigate to the AWS Lambda console.

    1. Select the function starting with the name ExtensionsCache-SampleFunction. Within the function code, the options array specifies which data to return from the cache. This is initially set to path: '/dynamodb?name=DynamoDbTable-pKey1-sKey1'
    2. Choose Configure test events to configure a test event.
    3. Enter a name for the Event name, accept the default payload, and select Create.
    4. Select Test to invoke the function. This returns the cached data from DynamoDB and logs the output.
    5. Successfully retrieve DynamoDB data from cache

      Successfully retrieve DynamoDB data from cache

    6. In the index.js file, amend the path statement to retrieve the Parameter Store configuration:
    7. const options = {
        "hostname": "localhost",
        "port": 4000,
        "path": "/parameters?name=CacheExtensions_Parameter1",
        "method": "GET"
      }
    8. Select Deploy to save the function configuration and select Test. The function returns the Parameter Store configuration item:
    9. Successfully retrieve Parameter Store data from cache

      Successfully retrieve Parameter Store data from cache

    10. In the function code, amend the path statement to retrieve the Secrets Manager secret:
    11. const options = {
        "hostname": "localhost",
        "port": 4000,
        "path": "/parameters?name=/aws/reference/secretsmanager/secret_info",
        "method": "GET"
      }
    12. Select Deploy to save the function configuration and select Test. The function returns the secret:
    Successfully retrieve Secrets Manager data from cache

    Successfully retrieve Secrets Manager data from cache

    The benefits of using Lambda extensions

    There are a number of benefits to using a Lambda extension for this solution:

    1. Improved Lambda function performance as data is cached in memory by the extension during initialization.
    2. Fewer AWS API calls to external services, this can reduce costs and helps avoid throttling limits if services are accessed frequently.
    3. Cache data is stored in memory and not in a file within the Lambda execution environment. This means that no additional process is required to manage the lifecycle of the file. In-memory is also more secure, as data is not persisted to disk for subsequent function invocations.
    4. The function requires less code, as it only needs to communicate with the extension via HTTP to retrieve the data. The function does not have to have additional libraries installed to communicate with DynamoDB, Parameter Store, Secrets Manager, or the local file system.
    5. The cache extension is a Golang compiled binary and the executable can be shared with functions running other runtimes like Node.js, Python, Java, etc.
    6. Using a YAML template to store the details of what to cache makes it easier to configure and add additional services.

    Comparing the performance benefit

    To test the performance of the cache extension, I compare two tests:

    1. A Golang Lambda function that accesses a secret from AWS Secrets Manager for every invocation.
    2. The ExtensionsCache-SampleFunction, previously deployed using AWS SAM. This uses the cache extension to access the secrets from Secrets Manager, the function reads the value from the cache.

    Both functions are configured with 512 MB of memory and the function timeout is set to 30 seconds.

    I use Artillery to load test both Lambda functions. The load runs for 100 invocations over 2 minutes. I use Amazon CloudWatch metrics to view the function average durations.

    Test 1 shows a duration of 43 ms for the first invocation as a cold start. Subsequent invocations average 22 ms.

    Test 1 performance results

    Test 1 performance results

    Test 2 shows a duration of 16 ms for the first invocation as a cold start. Subsequent invocations average 3 ms.

    Test 2 performance results

    Test 2 performance results

    Using the Lambda extensions caching layer shows a significant performance improvement. Cold start invocation duration is reduced by 62% and subsequent invocations by 80%.

    In this example, the CACHE_EXTENSION_INIT_STARTUP environment variable flag is not configured. With the flag enabled for the extension, data is pre-fetched during extension initialization and the cold start time is further reduced.

    Conclusion

    Using Lambda extensions is an effective way to cache static data from external services in Lambda functions. This reduces function latency and costs. This post shows how to build both a data and configuration cache using DynamoDB, Parameter Store, and Secrets Manager.

    To set up the walkthrough demo in this post, visit the GitHub repo, and follow the instructions in the README.md file.

    The extension uses a local configuration file to determine which values to cache, and retrieves the items from the external services. A Lambda function retrieves the values from the local cache using an HTTP request, without having to communicate with the external services directly. In this example, this results in an 80% reduction in function invocation time.

    For more serverless learning resources, visit https://serverlessland.com.

Building a serverless multi-player game that scales

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-a-serverless-multiplayer-game-that-scales/

This post is written by Tim Bruce, Sr. Solutions Architect, Developer Acceleration.

Game development is a highly iterative process with rapidly changing requirements. Many game developers want to maximize the time spent building features and less time configuring servers, managing infrastructure, and mastering scale.

AWS Serverless provides four key benefits for customers. First, it can help move from idea to market faster, by reducing operational overhead. Second, customers may realize lower costs with serverless by not over-provisioning hardware and software to operate. Third, serverless scales with user activity. Finally, serverless services provide built-in integration, allowing you to focus on your game instead of connecting pieces together.

For AWS Gaming customers, these benefits allow your teams to spend more time focusing on gameplay and content, instead of undifferentiated tasks such as setting up and maintaining servers and software. This can result in better gameplay and content, and a faster time-to-market.

This blog post introduces a game with a serverless-first architecture. Simple Trivia Service is a web-based game showing architectural patterns that you can apply in your own games.

Introducing the Simple Trivia Service

The Simple Trivia Service offers single- and multi-player trivia games with content created by players. There are many features in Simple Trivia Service found in games, such as user registration, chat, content creation, leaderboards, game play, and a marketplace.

Simple Trivia Service UI

Authenticated players can chat with other players, create and manage quizzes, and update their profile. They can play single- and multi-player quizzes, host quizzes, and buy and sell quizzes on the marketplace. The single- and multi-player game modes show how games with different connectivity and technical requirements can be delivered with serverless first architectures. The game modes and architecture solutions are covered in the Simple Trivia Service backend architecture section.

Simple Trivia Service front end

The Simple Trivia Service front end is a Vue.js single page application (SPA) that accesses backend services. The SPA app, accessed via a web browser, allows users to make requests to the game endpoints using HTTPS, secure WebSockets, and WebSockets over MQTT. These requests use integrations to access the serverless backend services.

Vue.js helps make this reference architecture more accessible. The front end uses AWS Amplify to build, deploy, and host the SPA without the need to provision and manage any resources.

Simple Trivia Service backend architecture

The backend architecture for Simple Trivia Service is defined in a set of AWS Serverless Application Model (AWS SAM) templates for portions of the game. A deployment guide is included in the README.md file in the GitHub repository. Here is a visual depiction of the backend architecture.

Reference architecture

Services used

Simple Trivia Service is built using AWS Lambda, Amazon API Gateway, AWS IoT, Amazon DynamoDB, Amazon Simple Notification Service (SNS), AWS Step Functions, Amazon Kinesis, Amazon S3, Amazon Athena, and Amazon Cognito:

  • Lambda enables serverless microservice features in Simple Trivia Service.
  • API Gateway provides serverless endpoints for HTTP/RESTful and WebSocket communication while IoT delivers a serverless endpoint for WebSockets over MQTT communication.
  • DynamoDB enables data storage and retrieval for internet-scale applications.
  • SNS provides microservice communications via publish/subscribe functionality.
  • Step Functions coordinates complex tasks to ensure appropriate outcomes.
  • Analytics for Simple Trivia Service are delivered via Kinesis and S3 with Athena providing a query/visualization capability.
  • Amazon Cognito provides secure, standards-based login and a user directory.

Two managed services that are not serverless, Amazon VPC NAT Gateway and Amazon ElastiCache for Redis, are also used. VPC NAT Gateway is required by VPC-enabled Lambda functions to reach services outside of the VPC, such as DynamoDB. ElastiCache provides an in-memory database suited for applications with submillisecond latency requirements.

User security and enabling communications to backend services

Players are required to register and log in before playing. Registration and login credentials are sent to Amazon Cognito using the Secure Remote Password protocol. Upon successfully logging in, Amazon Cognito returns a JSON Web Token (JWT) and an Amazon Cognito user session.

The JWT is included within requests to API Gateway, which validates the token before allowing the request to be forwarded to Lambda.

IoT requires additional security for users by using an AWS Identity and Access Management (IAM) policy. A policy attached to the Amazon Cognito user allows the player to connect, subscribe, and send messages to the IoT endpoint.

Game types and supporting architectures

Simple Trivia Service’s three game modes define how players interact with the backend services. These modes align to different architectures used within the game.

“Single Player” quiz architecture

“Single Player” quiz architecture

Single player quizzes have simple rules, short play sessions, and appeal to wide audiences. Single player game communication is player-to-endpoint only. This is accomplished with API Gateway via an HTTP API.

Four Lambda functions (ActiveGamesList, GamePlay, GameAnswer, and LeaderboardGet) enable single player games. These functions are integrated with API Gateway and respond to specific requests from the client. API Gateway forwards the request, including URI, body, and query string, to the appropriate Lambda function.

When a player chooses “Play”, a request is sent to API Gateway, which invokes the ActiveGamesList function. This function queries the ActiveGames DynamoDB table and returns the list of active games to the user.

The player selects a game, resulting in another request triggering the GamePlay function. GamePlay retrieves the game’s questions from the GamesDetail DynamoDB table. The front end maintains the state for the user during the game.

When all questions are answered, the SPA sends the player’s responses to API Gateway, invoking the GameAnswer function. This function scores the player’s responses against the GameDetails table. The score and answers are sent to the user.

Additionally, this function sends the player score for the leaderboard and player experience to two SNS topics (LeaderboardTopic and PlayerProgressTopic). The ScorePut and PlayerProgressPut functions subscribe to these topics. These two functions write the details to the Leaderboard and Player Progress DynamoDB tables.

This architecture processes these two actions asynchronously, resulting in the player receiving their score and answers without having to wait. This also allows for increased security for player progress, as only the PlayerProgressPut function is allowed to write to this table.

Finally, the player can view the game’s leaderboard, which is returned to the player as the response to the GetLeaderboard function. The function retrieves the top 10 scores and the current player’s score from the Leaderboard table.

“Multi-player – Casual and Competitive” architecture

“Multiplayer – Casual and Competitive” architecture

These game types require player-to-player and service-to-player communication. This is typically performed using TCP/UDP sockets or the WebSocket protocol. API Gateway WebSockets provides WebSocket communication and enables Lambda functions to send messages to and receive messages from game hosts and players.

Game hosts start games via the “Host” button, messaging the LiveAdmin function via API Gateway. The function adds the game to the LiveGames table, which allows players to find and join the game. A list of questions for the game is sent to the game host from the LiveAdmin function at this time. Additionally, the game host is added to the GameConnections table, which keeps track of which connections are related to a game. Players, via the LivePlayer function, are also added to this table when they join a game.

The game host client manages the state of the game for all players and controls the flow of the game, sending questions, correct answers, and leaderboards to players via API Gateway and the LiveAdmin function. The function only sends game messages to the players in the GameConnections table. Player answers are sent to the host via the LivePlayer function.

To end the game, the game host sends a message with the final leaderboard to all players via the LiveAdmin function. This function also stores the leaderboard in the Leaderboard table, removes the game from the ActiveGames table, and sends player progression messages to the Player Progress topic.

“Multi-player – Live Scoreboard” architecture

“Multiplayer – Live Scoreboard” architecture

This is an extension of other multi-player game types requiring similar communications. This uses IoT with WebSockets over MQTT as the transport. It enables the client to subscribe to a topic and act on messages it receives. IoT manages routing messages to clients based on their subscriptions.

This architecture moves the state management from the game host client to a data store on the backend. This change requires a database that can respond quickly to user actions. Simple Trivia Service uses ElastiCache for Redis for this database. Game questions, player responses, and the leaderboard are all stored and updated in Redis during the quiz. The ElastiCache instance is blocked from internet traffic by placing it in a VPC. A security group configures access for the Lambda functions in the same VPC.

Game hosts for this type of game start the game by hosting it, which sends a message to IoT, triggering the CacheGame function. This function adds the game to the ActiveGames table and caches the quiz details from DynamoDB into Redis. Players join the game by sending a message, which is delivered to the JoinGame function. This adds the user record to Redis and alerts the game host that a player has joined.

Game hosts can send questions to the players via a message that invokes the AskQuestion function. This function updates the current question number in Redis and sends the question to subscribed players via the AskQuestion function. The ReceiveAnswer function processes player responses. It validates the response, stores it in Redis, updates the scoreboard, and replies to all players with the updated scoreboard after the first correct answer. The game scoreboard is updated for players in real time.

When the game is over, the game host sends a message to the EndGame function via IoT. This function writes the game leaderboard to the Leaderboard table, sends player progress to the Player Progress SNS topic, deletes the game from cache, and removes the game from the ActiveGames table.

Conclusion

This post introduces the Simple Trivia Service, a single- and multi-player game built using a serverless-first architecture on AWS. I cover different solutions that you can use to enable connectivity from your game client to a serverless-first backend for both single- and multi-player games. I also include a walkthrough of the architecture for each of these solutions.

You can deploy the code for this solution to your own AWS account via instructions in the Simple Trivia Service GitHub repository.

For more serverless learning resources, visit Serverless Land.

Scaling Neuroscience Research on AWS

Post Syndicated from Konrad Rokicki original https://aws.amazon.com/blogs/architecture/scaling-neuroscience-research-on-aws/

HHMI’s Janelia Research Campus in Ashburn, Virginia has an integrated team of lab scientists and tool-builders who pursue a small number of scientific questions with potential for transformative impact. To drive science forward, we share our methods, results, and tools with the scientific community.

Introduction

Our neuroscience research application involves image searches that are computationally intensive but have unpredictable and sporadic usage patterns. The conventional on-premises approach is to purchase a powerful and expensive workstation, install and configure specialized software, and download the entire dataset to local storage. With 16 cores, a typical search of 50,000 images takes 30 seconds. A serverless architecture using AWS Lambda allows us to do this job in seconds for a few cents per search, and is capable of scaling to larger datasets.

Parallel Computation in Neuroscience Research

Basic research in neuroscience is often conducted on fruit flies. This is because their brains are small enough to study in a meaningful way with current tools, but complex enough to produce sophisticated behaviors. Conducting such research nonetheless requires an immense amount of data and computational power. Janelia Research Campus developed the NeuronBridge tool on AWS to accelerate scientific discovery by scaling computation in the cloud.

Color Depth Search Example fly brains

Figure 1: A “mask image” (on the left) is compared to many different fly brains (on the right) to find matching neurons. (Janella Research Campus)

The fruit fly (Drosophila melanogaster) has about 100,000 neurons and its brain is highly stereotyped. This means that the brain of one fruit fly is similar to the next one. Using electron microscopy (EM), the FlyEM project has reconstructed a wiring diagram of a fruit fly brain. This connectome includes the structure of the neurons and the connectivity between them. But EM is only half of the picture. Once scientists know the structure and connectivity, they must perform experiments to find what purpose the neurons serve.

Flies can be genetically modified to reproducibly express a fluorescent protein in certain neurons, causing those neurons to glow under a light microscope (LM). By iterating through many modifications, the FlyLight project has created a vast genetic driver library. This allows scientists to target individual neurons for experiments. For example, blocking a particular neuron of a fly from functioning, and then observing its behavior, allows a scientist to understand the function of that neuron. Through the course of many such experiments, scientists are currently uncovering the function of entire neuronal circuits.

We developed NeuronBridge, a tool available for use by neuroscience researchers around the world, to bridge the gap between the EM and LM data. Scientists can start with EM structure and find matching fly lines in LM. Or they may start with a fly line and find the corresponding neuronal circuits in the EM connectome.

Both EM and LM produce petabytes of 3D images. Image processing and machine learning algorithms are then applied to discern neuron structure. We also developed a computational shortcut called color depth MIP to represent depth as color. This technique compresses large 3D image stacks into smaller 2D images that can be searched efficiently.

Image search is an embarrassingly parallel problem ideally suited to parallelization with simple functions. In a typical search, the scientist will create a “mask image,” which is a color depth image featuring only the neuron they want to find. The search algorithm must then compare this image to hundreds of thousands of other images. The paradigm of launching many short-lived cloud workers, termed burst-parallel compute, was originally suggested by a group at UCSD. To scale NeuronBridge, we decided to build a serverless AWS-native implementation of burst-parallel image search.

The Architecture

Our main reason for using a serverless approach was that our usage patterns are unpredictable and sporadic. The total number of researchers who are likely to use our tool is not large, and only a small fraction of them will need the tool at any given time. Furthermore, our tool could go unused for weeks at a time, only to get a flood of requests after a new dataset is published. A serverless architecture allows us to cope with this unpredictable load. We can keep costs low by only paying for the compute time we actually use.

One challenge of implementing a burst-parallel architecture is that each Lambda invocation requires a network call, with the ensuing network latency. Spawning several thousands of functions from a single manager function can take many seconds. The trick to minimizing this latency is to parallelize these calls by recursively spawning concurrent managers in a tree structure. Each leaf in this tree spawns a set of worker functions to do the work of searching the imagery. Each worker reads a small batch of images from Amazon Simple Storage Service (S3). They are then compared to the mask image, and the intermediate results are written to Amazon DynamoDB, a serverless NoSQL database.

Serverless architecture for burst-parallel search

Figure 2: Serverless architecture for burst-parallel search

Search state is monitored by an AWS Step Functions state machine, which checks DynamoDB once per second. When all the results are ready, the Step Functions state machine runs another Lambda function to combine and sort the results. The state machine addresses error conditions and timeouts, and updates the browser when the asynchronous search is complete. We opted to use AWS AppSync to notify a React web client, providing an interactive user experience while remaining entirely serverless.

As we scaled to 3,000 concurrent Lambda functions reading from our data bucket, we reached Amazon S3’s limit of 5,500 GETs per second per prefix. The fix was to create numbered prefix folders and then randomize our key list. Each worker could then search a random list of images across many prefixes. This change distributed the load from our highly parallel functions across a number of S3 shards, and allowed us to run with much higher parallelism.

We also addressed cold-start latency. Infrequently used Lambda functions take longer to start than recently used ones, and our unpredictable usage patterns meant that we were experiencing many cold starts. In our Java functions, we found that most of the cold-start time was attributed to JVM initialization and class loading. Although many mitigations for this exist, our worker logic was small enough that rewriting the code to use Node.js was the obvious choice. This immediately yielded a huge improvement, reducing cold starts from 8-10 seconds down to 200 ms.

With all of these insights, we developed a general-purpose parallel computation framework called burst-compute. This AWS-native framework runs as a serverless application to implement this architecture. It allows you to massively scale your own custom worker functions and combiner functions. We used this new framework to implement our image search.

Conclusion

The burst-parallel architecture is a powerful new computation paradigm for scientific computing. It takes advantage of the enormous scale and technical innovation of the AWS Cloud to provide near-interactive on-demand compute without expensive hardware maintenance costs. As scientific computing capability matures for the cloud, we expect this kind of large-scale parallel computation to continue becoming more accessible. In the future, the cloud could open doors to entirely new types of scientific applications, visualizations, and analysis tools.

We would like to express our thanks to AWS Solutions Architects Scott Glasser and Ray Chang, for their assistance with design and prototyping, and to Geoffrey Meissner for reviewing drafts of this write-up. 

Source Code

All of the application code described in this article is open source and licensed for reuse:

The data and imagery are shared publicly on the Registry of Open Data on AWS.

Building a real-time notification system with Amazon Kinesis Data Streams for Amazon DynamoDB and Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/building-a-real-time-notification-system-with-amazon-kinesis-data-streams-for-amazon-dynamodb-and-amazon-kinesis-data-analytics-for-apache-flink/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and Internet of Things (IoT) data so that you can develop insights on sensor activity across various industries, including smart spaces, connected factories, smart packing, fitness monitoring, and more. It’s important to store these data points in a centralized data lake in real time, where they can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in the wind energy sector is to protect wind turbines from wind speed. As per National Wind Watch, every wind turbine has a range of wind speeds, typically 30–55 mph, in which it produces maximum capacity. When wind speed is greater than 70 mph, it’s important to start shutdown to protect the turbine from a high wind storm. Customers often store high-velocity IoT data in DynamoDB and use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3). To facilitate this ingestion pipeline, you can deploy AWS Lambda functions or write custom code to build a bridge between DynamoDB Streams and Kinesis streaming.

Amazon Kinesis Data Streams for DynamoDB help you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service, Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Analytics for Apache Flink (Data Analytics for Flink) and Amazon Simple Notification Service (Amazon SNS) to send a real-time notification when wind speed is greater than 60 mph so that the operator can take action to protect the turbine. You use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other AWS services without having to use Lambda or write and maintain complex code. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, and Data Analytics for Flink. In this post, we showcase Data Analytics for Flink, but this is just one of many available options.

Architecture

The following architecture diagram illustrates the wind turbine protection system.

The following architecture diagram illustrates the wind turbine protection system.

In this architecture, high-velocity wind speed data comes from the wind turbine and is stored in DynamoDB. To send an instant notification, you need to query the data in real time and send a notification when the wind speed is greater than the established maximum. To achieve this goal, you enable Kinesis Data Streams for DynamoDB, and then use Data Analytics for Flink to query real-time data in a 60-second tumbling window. This aggregated data is stored in another data stream, which triggers an email notification via Amazon SNS using Lambda when the wind speed is greater than 60 mph. You will build this entire data pipeline in a serverless manner.

Deploying the wind turbine data simulator

To replicate a real-life scenario, you need a wind turbine data simulator. We use Amazon Amplify in this post to deploy a user-friendly web application that can generate the required data and store it in DynamoDB. You must have a GitHub account which will help to fork the Amplify app code and deploy it in your AWS account automatically.

Complete the following steps to deploy the data simulator web application:

  1. Choose the following AWS Amplify link to launch the wind turbine data simulator web app.

  1. Choose Connect to GitHub and provide credentials, if required.

Choose Connect to GitHub and provide credentials, if required.

  1. In the Deploy App section, under Select service role, choose Create new role.
  2. Follow the instructions to create the role amplifyconsole-backend-role.
  3. When the role is created, choose it from the drop-down menu.
  4. Choose Save and deploy.

Choose Save and deploy.

On the next page, the dynamodb-streaming app is ready to deploy.

  1. Choose Continue.

On the next page, the dynamodb-streaming app is ready to deploy.

On the next page, you can see the app build and deployment progress, which might take as many as 10 minutes to complete.

  1. When the process is complete, choose the URL on the left to access the data generator user interface (UI).
  2. Make sure to save this URL because you will use it in later steps.

Make sure to save this URL because you will use it in later steps.

You also get an email during the build process related to your SSH key. This email indicates that the build process created an SSH key on your behalf to connect to the Amplify application with GitHub.

  1. On the sign-in page, choose Create account.

On the sign-in page, choose Create account.

  1. Provide a user name, password, and valid email to which the app can send you a one-time passcode to access the UI.
  2. After you sign in, choose Generate data to generate wind speed data.
  3. Choose the Refresh icon to show the data in the graph.

You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed-, and navigate to the table in the DynamoDB console.

To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed.

Now that the wind speed data simulator is ready, let’s deploy the rest of the data pipeline.

Deploying the automated data pipeline by using AWS CloudFormation

You use AWS CloudFormation templates to create all the necessary resources for the data pipeline. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. You can view the template and code in the GitHub repository.

  1. Choose Launch with CloudFormation Console:
  2. Choose the US West (Oregon) Region (us-west-2).
  3. For pEmail, enter a valid email to which the analytics pipeline can send notifications.
  4. Choose Next.

For pEmail, enter a valid email to which the analytics pipeline can send notifications.

  1. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  2. Choose Create stack.

This CloudFormation template creates the following resources in your AWS account:

  • An IAM role to provide a trust relationship between Kinesis and DynamoDB to replicate data from DynamoDB to the data stream
  • Two data streams:
    • An input stream to replicate data from DynamoDB
    • An output stream to store aggregated data from the Data Analytics for Flink app
  • A Lambda function
  • An SNS topic to send an email notifications about high wind speeds
  1. When the stack is ready, on the Outputs tab, note the values of both data streams.

When the stack is ready, on the Outputs tab, note the values of both data streams.

Check your email and confirm your subscription to receive notifications. Make sure to check your junk folder if you don’t see the email in your inbox.

Check your email and confirm your subscription to receive notifications.

Now you can use Kinesis Data Streams for DynamoDB, which allows you to have your data in both DynamoDB and Kinesis without having to use Lambda or write custom code.

Enabling Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so that you can send data from DynamoDB to Kinesis Data. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. In the DynamoDB console, choose the table that you created earlier (it begins with the prefix windspeed-).
  2. On the Overview tab, choose Manage streaming to Kinesis.

On the Overview tab, choose Manage streaming to Kinesis.

  1. Choose your input stream.

Choose your input stream.

  1. Choose Enable.

Choose Enable.

  1. Choose Close.

Choose Close.

Make sure that Stream enabled is set to Yes.

Make sure that Stream enabled is set to Yes.

Building the Data Analytics for Flink app for real-time data queries

As part of the CloudFormation stack, the new Data Analytics for Flink application is deployed in the configured AWS Region. When the stack is up and running, you should be able to see the new Data Analytics for Flink application in the configured Region. Choose Run to start the app.

Choose Run to start the app.

When your app is running, you should see the following application graph.

When your app is running, you should see the following application graph.

Review the Properties section of the app, which shows you the input and output streams that the app is using.

Review the Properties section of the app, which shows you the input and output streams that the app is using.

Let’s learn important code snippets of the Flink Java application in next section, which explain how the Flink application reads data from a data stream, aggregates the data, and outputs it to another data stream.

Diving Deep into Flink Java application code:

In the following code, createSourceFromStaticConfig provides all the wind turbine speed readings from the input stream in string format, which we pass to the WindTurbineInputMap map function. This function parses the string into the Tuple3 data type (exp Tuple3<>(turbineID, speed, 1)). All Tuple3 messages are grouped by turbineID to further apply a one-minute tumbling window. The AverageReducer reduce function provides two things: the sum of all the speeds for the specific turbineId in the one-minute window, and a count of the messages for the specific turbineId in the one-minute window. The AverageMap map function takes the output of the AverageReducer reduce function and transforms it into Tuple2 (exp Tuple2<>(turbineId, averageSpeed)). Then all turbineIds are filtered with an average speed greater than 60 and map them to a JSON-formatted message, which we send to the output stream by using the createSinkFromStaticConfig sink function.

final StreamExecutionEnvironment env =
   StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> input = createSourceFromStaticConfig(env);

input.map(new WindTurbineInputMap())
   .filter(v -> v.f2 > 0)
   .keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
   .reduce(new AverageReducer())
   .map(new AverageMap())
   .filter(v -> v.f1 > 60)
   .map(v -> "{ \"turbineID\": \"" + v.f0 + "\", \"avgSpeed\": "+ v.f1 +" }")
   .addSink(createSinkFromStaticConfig());

env.execute("Wind Turbine Data Aggregator");

The following code demonstrates how the createSourceFromStaticConfig and createSinkFromStaticConfig functions read the input and output stream names from the properties of the Data Analytics for Flink application and establish the source and sink of the streams.

private static DataStream<String> createSourceFromStaticConfig(
   StreamExecutionEnvironment env) throws IOException {
   Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
   Properties inputProperties = new Properties();
   inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));
   inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

   return env.addSource(new FlinkKinesisConsumer<>((String) applicationProperties.get("WindTurbineEnvironment").get("inputStreamName"),
      new SimpleStringSchema(), inputProperties));
}

private static FlinkKinesisProducer<String> createSinkFromStaticConfig() throws IOException {
   Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
   Properties outputProperties = new Properties();
   outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));

   FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
      SimpleStringSchema(), outputProperties);
   sink.setDefaultStream((String) applicationProperties.get("WindTurbineEnvironment").get("outputStreamName"));
   sink.setDefaultPartition("0");
   return sink;
}

In the following code, the WindTurbineInputMap map function parses Tuple3 out of the string message. Additionally, the AverageMap map and AverageReducer reduce functions process messages to accumulate and transform data.

public static class WindTurbineInputMap implements MapFunction<String, Tuple3<String, Integer, Integer>> {
   @Override
   public Tuple3<String, Integer, Integer> map(String value) throws Exception {
      String eventName = JsonPath.read(value, "$.eventName");
      if(eventName.equals("REMOVE")) {
         return new Tuple3<>("", 0, 0);
      }
      String turbineID = JsonPath.read(value, "$.dynamodb.NewImage.deviceID.S");
      Integer speed = Integer.parseInt(JsonPath.read(value, "$.dynamodb.NewImage.value.N"));
      return new Tuple3<>(turbineID, speed, 1);
   }
}

public static class AverageMap implements MapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
   @Override
   public Tuple2<String, Integer> map(Tuple3<String, Integer, Integer> value) throws Exception {
      return new Tuple2<>(value.f0, (value.f1 / value.f2));
   }
}

public static class AverageReducer implements ReduceFunction<Tuple3<String, Integer, Integer>> {
   @Override
   public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> value1, Tuple3<String, Integer, Integer> value2) {
      return new Tuple3<>(value1.f0, value1.f1 + value2.f1, value1.f2 + 1);
   }
}

Receiving email notifications of high wind speed

The following screenshot shows an example of the notification email you will receive about high wind speeds.

The following screenshot shows an example of the notification email you will receive about high wind speeds.

To test the feature, in this section you generate high wind speed data from the simulator, which is stored in DynamoDB, and get an email notification when the average wind speed is greater than 60 mph for a one-minute period. You’ll observe wind data flowing through the data stream and Data Analytics for Flink.

To test this feature:

  1. Generate wind speed data in the simulator and confirm that it’s stored in DynamoDB.
  2. In the Kinesis Data Streams console, choose the input data stream, kds-ddb-blog-InputKinesisStream.
  3. On the Monitoring tab of the stream, you can observe the Get records – sum (Count) metrics, which show multiple records captured by the data stream automatically.
  4. In the Kinesis Data Analytics console, choose the Data Analytics for Flink application, kds-ddb-blog-windTurbineAggregator.
  5. On the Monitoring tab, you can see the Last Checkpoint metrics, which show multiple records captured by the Data Analytics for Flink app automatically.
  6. In the Kinesis Data Streams console, choose the output stream, kds-ddb-blog-OutputKinesisStream.
  7. On the Monitoring tab, you can see the Get records – sum (Count) metrics, which show multiple records output by the app.
  8. Finally, check your email for a notification.

If you don’t see a notification, change the data simulator value range between a minimum of 50 mph and maximum of 90 mph and wait a few minutes.

Conclusion

As you have learned in this post, you can build an end-to-end serverless analytics pipeline to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. This allows your team to focus on solving business problems by getting useful insights immediately. IoT and application development have a variety of use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this blog post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Saurabh Shrivastava is a solutions architect leader and analytics/machine learning specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

Sameer Goel is a solutions architect in Seattle who drives customers’ success by building prototypes on cutting-edge initiatives. Prior to joining AWS, Sameer graduated with a Master’s degree with a Data Science concentration from NEU Boston. He enjoys building and experimenting with creative projects and applications.

 

 

Pratik Patel is a senior technical account manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions by using best practices, and proactively helps keep customers’ AWS environments operationally healthy.

ICYMI: Serverless Q4 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2020/

Welcome to the 12th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

ICYMI Q4 calendar

In case you missed our last ICYMI, check out what happened last quarter here.

AWS re:Invent

re:Invent 2020 banner

re:Invent was entirely virtual in 2020 and free to all attendees. The conference had a record number of registrants and featured over 700 sessions. The serverless developer advocacy team presented a number of talks to help developers build their skills. These are now available on-demand:

AWS Lambda

There were three major Lambda announcements at re:Invent. Lambda duration billing changed granularity from 100 ms to 1 ms, which is shown in the December billing statement. All functions benefit from this change automatically, and it’s especially beneficial for sub-100ms Lambda functions.

Lambda has also increased the maximum memory available to 10 GB. Since memory also controls CPU allocation in Lambda, this means that functions now have up to 6 vCPU cores available for processing. Finally, Lambda now supports container images as a packaging format, enabling teams to use familiar container tooling, such as Docker CLI. Container images are stored in Amazon ECR.

There were three feature releases that make it easier for developers working on data processing workloads. Lambda now supports self-hosted Kafka as an event source, allowing you to source events from on-premises or instance-based Kafka clusters. You can also process streaming analytics with tumbling windows and use custom checkpoints for processing batches with failed messages.

We launched Lambda Extensions in preview, enabling you to more easily integrate monitoring, security, and governance tools into Lambda functions. You can also build your own extensions that run code during Lambda lifecycle events. See this example extensions repo for starting development.

You can now send logs from Lambda functions to custom destinations by using Lambda Extensions and the new Lambda Logs API. Previously, you could only forward logs after they were written to Amazon CloudWatch Logs. Now, logging tools can receive log streams directly from the Lambda execution environment. This makes it easier to use your preferred tools for log management and analysis, including Datadog, Lumigo, New Relic, Coralogix, Honeycomb, or Sumo Logic.

Lambda Logs API architecture

Lambda launched support for Amazon MQ as an event source. Amazon MQ is a managed broker service for Apache ActiveMQ that simplifies deploying and scaling queues. The event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service manages an internal poller to invoke the target Lambda function.

Lambda announced support for AWS PrivateLink. This allows you to invoke Lambda functions from a VPC without traversing the public internet. It provides private connectivity between your VPCs and AWS services. By using VPC endpoints to access the Lambda API from your VPC, this can replace the need for an Internet Gateway or NAT Gateway.

For developers building machine learning inferencing, media processing, high performance computing (HPC), scientific simulations, and financial modeling in Lambda, you can now use AVX2 support to help reduce duration and lower cost. In this blog post’s example, enabling AVX2 for an image-processing function increased performance by 32-43%.

Lambda now supports batch windows of up to 5 minutes when using SQS as an event source. This is useful for workloads that are not time-sensitive, allowing developers to reduce the number of Lambda invocations from queues. Additionally, the batch size has been increased from 10 to 10,000. This is now the same batch size as Kinesis as an event source, helping Lambda-based applications process more data per invocation.

Code signing is now available for Lambda, using AWS Signer. This allows account administrators to ensure that Lambda functions only accept signed code for deployment. You can learn more about using this new feature in the developer documentation.

AWS Step Functions

Synchronous Express Workflows have been launched for AWS Step Functions, providing a new way to run high-throughput Express Workflows. This feature allows developers to receive workflow responses without needing to poll services or build custom solutions. This is useful for high-volume microservice orchestration and fast compute tasks communicating via HTTPS.

The Step Functions service recently added support for other AWS services in workflows. You can now integrate API Gateway REST and HTTP APIs. This enables you to call API Gateway directly from a state machine as an asynchronous service integration.

Step Functions now also supports Amazon EKS service integration. This allows you to build workflows with steps that synchronously launch tasks in EKS and wait for a response. The service also announced support for Amazon Athena, so workflows can now query data in your S3 data lakes.

Amazon API Gateway

API Gateway now supports mutual TLS authentication, which is commonly used for business-to-business applications and standards such as Open Banking. This is provided at no additional cost. You can now also disable the default REST API endpoint when deploying APIs using custom domain names.

HTTP APIs now supports service integrations with Step Functions Synchronous Express Workflows. This is a result of the service team’s work to add the most popular features of REST APIs to HTTP APIs.

AWS X-Ray

X-Ray now integrates with Amazon S3 to trace upstream requests. If a Lambda function uses the X-Ray SDK, S3 sends tracing headers to downstream event subscribers. This allows you to use the X-Ray service map to view connections between S3 and other services used to process an application request.

X-Ray announced support for end-to-end tracing in Step Functions to make it easier to trace requests across multiple AWS services. It also launched X-Ray Insights in preview, which generates actionable insights based on anomalies detected in an application. For Java developers, the services released an auto-instrumentation agent, for collecting instrumentation without modifying existing code.

Additionally, the AWS Distro for Open Telemetry is now in preview. OpenTelemetry is a collaborative effort by tracing solution providers to create common approaches to instrumentation.

Amazon EventBridge

You can now use event replay to archive and replay events with Amazon EventBridge. After configuring an archive, EventBridge automatically stores all events or filtered events, based upon event pattern matching logic. Event replay can help with testing new features or changes in your code, or hydrating development or test environments.

EventBridge archive and replay

EventBridge also launched resource policies that simplify managing access to events across multiple AWS accounts. Resource policies provide a powerful mechanism for modeling event buses across multiple account and providing fine-grained access control to EventBridge API actions.

EventBridge resource policies

EventBridge announced support for Server-Side Encryption (SSE). Events are encrypted using AES-256 at no additional cost for customers. EventBridge also increased PutEvent quotas to 10,000 transactions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). This helps support workloads with high throughput.

Developer tools

The AWS SDK for JavaScript v3 was launched and includes first-class TypeScript support and a modular architecture. This makes it easier to import only the services needed to minimize deployment package sizes.

The AWS Serverless Application Model (AWS SAM) is an AWS CloudFormation extension that makes it easier to build, manage, and maintain serverless applications. The latest versions include support for cached and parallel builds, together with container image support for Lambda functions.

You can use AWS SAM in the new AWS CloudShell, which provides a browser-based shell in the AWS Management Console. This can help run a subset of AWS SAM CLI commands as an alternative to using a dedicated instance or AWS Cloud9 terminal.

AWS CloudShell

Amazon SNS

Amazon SNS announced support for First-In-First-Out (FIFO) topics. These are used with SQS FIFO queues for applications that require strict message ordering with exactly once processing and message deduplication.

Amazon DynamoDB

Developers can now use PartiQL, an SQL-compatible query language, with DynamoDB tables, bringing familiar SQL syntax to NoSQL data. You can also choose to use Kinesis Data Streams to capture changes to tables.

For customers using DynamoDB global tables, you can now use your own encryption keys. While all data in DynamoDB is encrypted by default, this feature enables you to use customer managed keys (CMKs). DynamoDB also announced the ability to export table data to data lakes in Amazon S3. This enables you to use services like Amazon Athena and AWS Lake Formation to analyze DynamoDB data with no custom code required.

AWS Amplify and AWS AppSync

You can now use existing Amazon Cognito user pools and identity pools for Amplify projects, making it easier to build new applications for an existing user base. With the new AWS Amplify Admin UI, you can configure application backends without using the AWS Management Console.

AWS AppSync enabled AWS WAF integration, making it easier to protect GraphQL APIs against common web exploits. You can also implement rate-based rules to help slow down brute force attacks. Using AWS Managed Rules for AWS WAF provides a faster way to configure application protection without creating the rules directly.

Serverless Posts

October

November

December

Tech Talks & Events

We hold AWS Online Tech Talks covering serverless topics throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the world, speak on podcasts, and record videos you can find to learn in bite-sized chunks.

Here are some from Q4:

Videos

October:

November:

December:

There are also other helpful videos covering Serverless available on the Serverless Land YouTube channel.

The Serverless Land website

Serverless Land website

To help developers find serverless learning resources, we have curated a list of serverless blogs, videos, events, and training programs at a new site, Serverless Land. This is regularly updated with new information – you can subscribe to the RSS feed for automatic updates or follow the LinkedIn page.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

Optimizing batch processing with custom checkpoints in AWS Lambda

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/

AWS Lambda can process batches of messages from sources like Amazon Kinesis Data Streams or Amazon DynamoDB Streams. In normal operation, the processing function moves from one batch to the next to consume messages from the stream.

However, when an error occurs in one of the items in the batch, this can result in reprocessing some of the same messages in that batch. With the new custom checkpoint feature, there is now much greater control over how you choose to process batches containing failed messages.

This blog post explains the default behavior of batch failures and options available to developers to handle this error state. I also cover how to use this new checkpoint capability and show the benefits of using this feature in your stream processing functions.

Overview

When using a Lambda function to consume messages from a stream, the batch size property controls the maximum number of messages passed in each event.

The stream manages two internal pointers: a checkpoint and a current iterator. The checkpoint is the last known item position that was successfully processed. The current iterator is the position in the stream for the next read operation. In a successful operation, here are two batches processed from a stream with a batch size of 10:

Checkpoints and current iterators

  1. The first batch delivered to the Lambda function contains items 1–10. The function processes these items without error.
  2. The checkpoint moves to item 11. The next batch delivered to the Lambda function contains items 11–20.

In default operation, the processing of the entire batch must succeed or fail. If a single item fails processing and the function returns an error, the batch fails. The entire batch is then retried until the maximum retries is reached. This can result in the same failure occurring multiple times and unnecessary processing of individual messages.

You can also enable the BisectBatchOnFunctonError property in the event source mapping. If there is a batch failure, the calling service splits the failed batch into two and retries the half-batches separately. The process continues recursively until there is a single item in a batch or messages are processed successfully. For example, in a batch of 10 messages, where item number 5 is failing, the processing occurs as follows:

Bisect batch on error processing

  1. Batch 1 fails. It’s split into batches 2 and 3.
  2. Batch 2 fails, and batch 3 succeeds. Batch 2 is split into batches 4 and 5.
  3. Batch 4 fails and batch 5 succeeds. Batch 4 is split into batches 6 and 7.
  4. Batch 6 fails and batch 7 succeeds.

While this provides a way to process messages in a batch with one failing message, it results in multiple invocations of the function. In this example, message number 4 is processed four times before succeeding.

With the new custom checkpoint feature, you can return the sequence identifier for the failed messages. This provides more precise control over how to choose to continue processing the stream. For example, in a batch of 10 messages where the sixth message fails:

Custom checkpoint behavior

  1. Lambda processes the batch of messages, items 1–10. The sixth message fails and the function returns the failed sequence identifier.
  2. The checkpoint in the stream is moved to the position of the failed message. The batch is retried for only messages 6–10.

Existing stream processing behaviors

In the following examples, I use a DynamoDB table with a Lambda function that is invoked by the stream for the table. You can also use a Kinesis data stream if preferred, as the behavior is the same. The event source mapping is set to a batch size of 10 items so all the stream messages are passed in the event to a single Lambda invocation.

Architecture diagram

I use the following Node.js script to generate batches of 10 items in the table.

const AWS = require('aws-sdk')
AWS.config.update({ region: 'us-east-1' })
const docClient = new AWS.DynamoDB.DocumentClient()

const ddbTable = 'ddbTableName'
const BATCH_SIZE = 10

const createRecords = async () => {
  // Create envelope
  const params = {
    RequestItems: {}
  }
  params.RequestItems[ddbTable] = []

  // Add items to batch and write to DDB
  for (let i = 0; i < BATCH_SIZE; i++) {
    params.RequestItems[ddbTable].push({
      PutRequest: {
        Item: {
          ID: Date.now() + i
        }
      }
    })
  }
  await docClient.batchWrite(params).promise()
}

const main = async() => await createRecords()
main()

After running this script, there are 10 items in the DynamoDB table, which are then put into the DynamoDB stream for processing.

10 items in DynamoDB table

The processing Lambda function uses the following code. This contains a constant called FAILED_MESSAGE_NUM to force an error on the message with the corresponding index in the event batch:

exports.handler = async (event) => {
  console.log(JSON.stringify(event, null, 2))
  console.log('Records: ', event.Records.length)
  const FAILED_MESSAGE_NUM = 6
  
  let recordNum = 1
  let batchItemFailures = []

  event.Records.map((record) => {
    const sequenceNumber = record.dynamodb.SequenceNumber
    
    if ( recordNum === FAILED_MESSAGE_NUM ) {
      console.log('Error! ', sequenceNumber)
      throw new Error('kaboom')
    }
    console.log('Success: ', sequenceNumber)
    recordNum++
  })
}

The code uses the DynamoDB item’s sequence number, which is provided in each record of the stream event:

Item sequence number in event

In the default configuration of the event source mapping, the failure of message 6 causes the whole batch to fail. The entire batch is then retried multiple times. This appears in the CloudWatch Logs for the function:

Logs with retried batches

Next, I enable the bisect-on-error feature in the function’s event trigger. The first invocation fails as before but this causes two subsequent invocations with batches of five messages. The original batch is bisected. These batches complete processing successfully.

Logs with bisected batches

Configuring a custom checkpoint

Finally, I enable the custom checkpoint feature. This is configured in the Lambda function console by selecting the “Report batch item failures” check box in the DynamoDB trigger:

Add trigger settings

I update the processing Lambda function with the following code:

exports.handler = async (event) => {
  console.log(JSON.stringify(event, null, 2))
  console.log('Records: ', event.Records.length)
  const FAILED_MESSAGE_NUM = 4
  
  let recordNum = 1
  let sequenceNumber = 0
    
  try {
    event.Records.map((record) => {
      sequenceNumber = record.dynamodb.SequenceNumber
  
      if ( recordNum === FAILED_MESSAGE_NUM ) {
        throw new Error('kaboom')
      }
      console.log('Success: ', sequenceNumber)
      recordNum++
    })
  } catch (err) {
    // Return failed sequence number to the caller
    console.log('Failure: ', sequenceNumber)
    return { "batchItemFailures": [ {"itemIdentifier": sequenceNumber} ]  }
  }
}

In this version of the code, the processing of each message is wrapped in a try…catch block. When processing fails, the function stops processing any remaining messages. It returns the sequence number of the failed message in a JSON object:

{ 
  "batchItemFailures": [ 
    {
      "itemIdentifier": sequenceNumber
    }
  ]
}

The calling service then updates the checkpoint value with the sequence number provided. If the batchItemFailures array is empty, the caller assumes all messages have been processed correctly. If the batchItemFailures array contains multiple items, the lowest sequence number is used as the checkpoint.

In this example, I also modify the FAILED_MESSAGE_NUM constant to 4 in the Lambda function. This causes the fourth message in every batch to throw an error. After adding 10 items to the DynamoDB table, the CloudWatch log for the processing function shows:

Lambda function logs

This is how the stream of 10 messages has been processed using the custom checkpoint:

Custom checkpointing walkthrough

  1. In the first invocation, all 10 messages are in the batch. The fourth message throws an error. The function returns this position as the checkpoint.
  2. In the second invocation, messages 4–10 are in the batch. Message 7 throws an error and its sequence number is returned as the checkpoint.
  3. In the third invocation, the batch contains messages 7–10. Message 10 throws an error and its sequence number is now the returned checkpoint.
  4. The final invocation contains only message 10, which is successfully processed.

Using this approach, subsequent invocations do not receive messages that have been successfully processed previously.

Conclusion

The default behavior for stream processing in Lambda functions enables entire batches of messages to succeed or fail. You can also use batch bisecting functionality to retry batches iteratively if a single message fails. Now with custom checkpoints, you have more control over handling failed messages.

This post explains the three different processing modes and shows example code for handling failed messages. Depending upon your use-case, you can choose the appropriate mode for your workload. This can help reduce unnecessary Lambda invocations and prevent reprocessing of the same messages in batches containing failures.

To learn more about how to use this feature, read the developer documentation. To learn more about building with serverless technology, visit Serverless Land.

Using AWS Lambda for streaming analytics

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-aws-lambda-for-streaming-analytics/

AWS Lambda now supports streaming analytics calculations for Amazon Kinesis and Amazon DynamoDB. This allows developers to calculate aggregates in near-real time and pass state across multiple Lambda invocations. This feature provides an alternative way to build analytics in addition to services like Amazon Kinesis Data Analytics.

In this blog post, I explain how this feature works with Kinesis Data Streams and DynamoDB Streams, together with example use-cases.

Overview

For workloads using streaming data, data arrives continuously, often from different sources, and is processed incrementally. Discrete data processing tasks, such as operating on files, have a known beginning and end boundary for the data. For applications with streaming data, the processing function does not know when the data stream starts or ends. Consequently, this type of data is commonly processed in batches or windows.

Before this feature, Lambda-based stream processing was limited to working on the incoming batch of data. For example, in Amazon Kinesis Data Firehose, a Lambda function transforms the current batch of records with no information or state from previous batches. This is also the same for processing DynamoDB streams using Lambda functions. This existing approach works well for MapReduce or tasks focused exclusively on the date in the current batch.

Comparing DynamoDB and Kinesis streams

  1. DynamoDB streams invoke a processing Lambda function asynchronously. After processing, the function may then store the results in a downstream service, such as Amazon S3.
  2. Kinesis Data Firehose invokes a transformation Lambda function synchronously, which returns the transformed data back to the service.

This new feature introduces the concept of a tumbling window, which is a fixed-size, non-overlapping time interval of up to 15 minutes. To use this, you specify a tumbling window duration in the event-source mapping between the stream and the Lambda function. When you apply a tumbling window to a stream, items in the stream are grouped by window and sent to the processing Lambda function. The function returns a state value that is passed to the next tumbling window.

You can use this to calculate aggregates over multiple windows. For example, you can calculate the total value of a data item in a stream using 30-second tumbling windows:

Tumbling windows

  1. Integer data arrives in the stream at irregular time intervals.
  2. The first tumbling window consists of data in the 0–30 second range, passed to the Lambda function. It adds the items and returns the total of 6 as a state value.
  3. The second tumbling window invokes the Lambda function with the state value of 6 and the 30–60 second batch of stream data. This adds the items to the existing total, returning 18.
  4. The third tumbling window invokes the Lambda function with a state value of 18 and the next window of values. The running total is now 28 and returned as the state value.
  5. The fourth tumbling window invokes the Lambda function with a state value of 28 and the 90–120 second batch of data. The final total is 32.

This feature is useful in workloads where you need to calculate aggregates continuously. For example, for a retailer streaming order information from point-of-sale systems, it can generate near-live sales data for downstream reporting. Using Lambda to generate aggregates only requires minimal code, and the function can access other AWS services as needed.

Using tumbling windows with Lambda functions

When you configure an event source mapping between Kinesis or DynamoDB and a Lambda function, use the new setting, Tumbling window duration. This appears in the trigger configuration in the Lambda console:

Trigger configuration

You can also set this value in AWS CloudFormation and AWS SAM templates. After the event source mapping is created, events delivered to the Lambda function have several new attributes:

New attributes in events

These include:

  • Window start and end: the beginning and ending timestamps for the current tumbling window.
  • State: an object containing the state returned from the previous window, which is initially empty. The state object can contain up to 1 MB of data.
  • isFinalInvokeForWindow: indicates if this is the last invocation for the tumbling window. This only occurs once per window period.
  • isWindowTerminatedEarly: a window ends early only if the state exceeds the maximum allowed size of 1 MB.

In any tumbling window, there is a series of Lambda invocations following this pattern:

Tumbling window process in Lambda

  1. The first invocation contains an empty state object in the event. The function returns a state object containing custom attributes that are specific to the custom logic in the aggregation.
  2. The second invocation contains the state object provided by the first Lambda invocation. This function returns an updated state object with new aggregated values. Subsequent invocations follow this same sequence.
  3. The final invocation in the tumbling window has the isFinalInvokeForWindow flag set to the true. This contains the state returned by the most recent Lambda invocation. This invocation is responsible for storing the result in S3 or in another data store, such as a DynamoDB table. There is no state returned in this final invocation.

Using tumbling windows with DynamoDB

DynamoDB streams can invoke Lambda function using tumbling windows, enabling you to generate aggregates per shard. In this example, an ecommerce workload saves orders in a DynamoDB table and uses a tumbling window to calculate the near-real time sales total.

First, I create a DynamoDB table to capture the order data and a second DynamoDB table to store the aggregate calculation. I create a Lambda function with a trigger from the first orders table. The event source mapping is created with a Tumbling window duration of 30 seconds:

DynamoDB trigger configuration

I use the following code in the Lambda function:

const AWS = require('aws-sdk')
AWS.config.update({ region: process.env.AWS_REGION })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindowsAggregation'

function isEmpty(obj) { return Object.keys(obj).length === 0 }

exports.handler = async (event) => {
    // Save aggregation result in the final invocation
    if (event.isFinalInvokeForWindow) {
        console.log('Final: ', event)
        
        const params = {
          TableName,
          Item: {
            windowEnd: event.window.end,
            windowStart: event.window.start,
            sales: event.state.sales,
            shardId: event.shardId
          }
        }
        return await docClient.put(params).promise()
    }
    console.log(event)
    
    // Create the state object on first invocation or use state passed in
    let state = event.state

    if (isEmpty (state)) {
        state = {
            sales: 0
        }
    }
    console.log('Existing: ', state)

    // Process records with custom aggregation logic

    event.Records.map((item) => {
        // Only processing INSERTs
        if (item.eventName != "INSERT") return
        
        // Add sales to total
        let value = parseFloat(item.dynamodb.NewImage.sales.N)
        console.log('Adding: ', value)
        state.sales += value
    })

    // Return the state for the next invocation
    console.log('Returning state: ', state)
    return { state: state }
}

This function code processes the incoming event to aggregate a sales attribute, and return this aggregated result in a state object. In the final invocation, it stores the aggregated value in another DynamoDB table.

I then use this Node.js script to generate random sample order data:

const AWS = require('aws-sdk')
AWS.config.update({ region: 'us-east-1' })
const docClient = new AWS.DynamoDB.DocumentClient()

const TableName = 'tumblingWindows'
const ITERATIONS = 100
const SLEEP_MS = 100

let totalSales = 0

function sleep(ms) { 
  return new Promise(resolve => setTimeout(resolve, ms));
}

const createSales = async () => {
  for (let i = 0; i < ITERATIONS; i++) {

    let sales = Math.round (parseFloat(100 * Math.random()))
    totalSales += sales
    console.log ({i, sales, totalSales})

    await docClient.put ({
      TableName,
      Item: {
        ID: Date.now().toString(),
        sales,
        ITERATIONStamp: new Date().toString()
      }
    }).promise()
    await sleep(SLEEP_MS)
  }
}

const main = async() => {
  await createSales()
  console.log('Total Sales: ', totalSales)
}

main()

Once the script is complete, the console shows the individual order transactions and the total sales:

Script output

After the tumbling window duration is finished, the second DynamoDB table shows the aggregate values calculated and stored by the Lambda function:

Aggregate values in second DynamoDB table

Since aggregation for each shard is independent, the totals are stored by shardId. If I continue to run the test data script, the aggregation function continues to calculate and store more totals per tumbling window period.

Using tumbling windows with Kinesis

Kinesis data streams can also invoke a Lambda function using a tumbling window in a similar way. The biggest difference is that you control how many shards are used in the data stream. Since aggregation occurs per shard, this controls the total number aggregate results per tumbling window.

Using the same sales example, first I create a Kinesis data stream with one shard. I use the same DynamoDB tables from the previous example, then create a Lambda function with a trigger from the first orders table. The event source mapping is created with a Tumbling window duration of 30 seconds:

Kinesis trigger configuration

I use the following code in the Lambda function, modified to process the incoming Kinesis data event:

const AWS = require('aws-sdk')
AWS.config.update({ region: process.env.AWS_REGION })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindowsAggregation'

function isEmpty(obj) {
    return Object.keys(obj).length === 0
}

exports.handler = async (event) => {

    // Save aggregation result in the final invocation
    if (event.isFinalInvokeForWindow) {
        console.log('Final: ', event)
        
        const params = {
          TableName,
          Item: {
            windowEnd: event.window.end,
            windowStart: event.window.start,
            sales: event.state.sales,
            shardId: event.shardId
          }
        }
        console.log({ params })
        await docClient.put(params).promise()

    }
    console.log(JSON.stringify(event, null, 2))
    
    // Create the state object on first invocation or use state passed in
    let state = event.state

    if (isEmpty (state)) {
        state = {
            sales: 0
        }
    }
    console.log('Existing: ', state)

    // Process records with custom aggregation logic

    event.Records.map((record) => {
        const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii')
        const item = JSON.parse(payload).Item

        // // Add sales to total
        let value = parseFloat(item.sales)
        console.log('Adding: ', value)
        state.sales += value
    })

    // Return the state for the next invocation
    console.log('Returning state: ', state)
    return { state: state }
}

This function code processes the incoming event in the same way as the previous example. I then use this Node.js script to generate random sample order data, modified to put the data on the Kinesis stream:

const AWS = require('aws-sdk')
AWS.config.update({ region: 'us-east-1' })
const kinesis = new AWS.Kinesis()

const StreamName = 'testStream'
const ITERATIONS = 100
const SLEEP_MS = 10

let totalSales = 0

function sleep(ms) { 
  return new Promise(resolve => setTimeout(resolve, ms));
}

const createSales = async() => {

  for (let i = 0; i < ITERATIONS; i++) {

    let sales = Math.round (parseFloat(100 * Math.random()))
    totalSales += sales
    console.log ({i, sales, totalSales})

    const data = {
      Item: {
        ID: Date.now().toString(),
        sales,
        timeStamp: new Date().toString()
      }
    }

    await kinesis.putRecord({
      Data: Buffer.from(JSON.stringify(data)),
      PartitionKey: 'PK1',
      StreamName
    }).promise()
    await sleep(SLEEP_MS)
  }
}

const main = async() => {
  await createSales()
}

main()

Once the script is complete, the console shows the individual order transactions and the total sales:

Console output

After the tumbling window duration is finished, the second DynamoDB table shows the aggregate values calculated and stored by the Lambda function:

Aggregate values in second DynamoDB table

As there is only one shard in this Kinesis stream, there is only one aggregation value for all the data items in the test.

Conclusion

With tumbling windows, you can calculate aggregate values in near-real time for Kinesis data streams and DynamoDB streams. Unlike existing stream-based invocations, state can be passed forward by Lambda invocations. This makes it easier to calculate sums, averages, and counts on values across multiple batches of data.

In this post, I walk through an example that aggregates sales data stored in Kinesis and DynamoDB. In each case, I create an aggregation function with an event source mapping that uses the new tumbling window duration attribute. I show how state is passed between invocations and how to persist the aggregated value at the end of the tumbling window.

To learn more about how to use this feature, read the developer documentation. To learn more about building with serverless technology, visit Serverless Land.

Detecting sensitive data in DynamoDB with Macie

Post Syndicated from Sheldon Sides original https://aws.amazon.com/blogs/security/detecting-sensitive-data-in-dynamodb-with-macie/

Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data in Amazon Web Services (AWS). It gives you the ability to automatically scan for sensitive data and get an inventory of your Amazon Simple Storage Service (Amazon S3) buckets. Macie also gives you the added ability to detect which buckets are public, unencrypted, and accessible from other AWS accounts.

In this post, we’ll walk through how to use Macie to detect sensitive data in Amazon DynamoDB tables by exporting the data to Amazon S3 so that Macie can scan the data. An example of why you would deploy a solution like this is if you have potentially sensitive data stored in DynamoDB tables. When we’re finished, you’ll have a solution that can set up on-demand or scheduled Macie discovery jobs to detect sensitive data exported from DynamoDB to S3.

Architecture

In figure 1, you can see an architectural diagram explaining the flow of the solution that you’ll be deploying.

Figure 1: Solution architecture

Figure 1: Solution architecture

Here’s a brief overview of the steps that you’ll take to deploy the solution. Some steps you will do manually, while others will be handled by the provided AWS CloudFormation template. The following outline describes the steps taken to extract the data from DynamoDB and store it in S3, which allows Macie to run a discovery job against the data.

  1. Enable Amazon Macie, if it isn’t already enabled.
  2. Deploy a test DynamoDB dataset.
  3. Create an S3 bucket to export DynamoDB data to.
  4. Configure an AWS Identity and Access Management (IAM) policy and role. (These are used by the Lambda function to access the S3 and DynamoDB tables)
  5. Deploy an AWS Lambda function to export DynamoDB data to S3.
  6. Set up an Amazon EventBridge rule to schedule export of the DynamoDB data.
  7. Create a Macie discovery job to discover sensitive data from the DynamoDB data export.
  8. View the results of the Macie discovery job.

The goal is that when you finish, you have a solution that you can use to set up either on-demand or scheduled Macie discovery jobs to detect sensitive data that was exported from DynamoDB to S3.

Prerequisite: Enable Macie

If Macie hasn’t been enabled in your account, complete Step 1 in Getting started with Amazon Macie to enable Macie. Once you’ve enabled Macie, you can proceed with the deployment of the CloudFormation template.

Deploy the CloudFormation template

In this section, you start by deploying the CloudFormation template that will deploy all the resources needed for the solution. You can then review the output of the resources that have been deployed.

To deploy the CloudFormation template

  1. Download the CloudFormation template: https://github.com/aws-samples/macie-dynamodb-blog/blob/main/src/cft.yaml
  2. Sign in to the AWS Management Console and navigate to the CloudFormation console.
  3. Choose Upload a template file, and then select the CloudFormation template that you downloaded in the previous step. Choose Next.

    Figure 2 - Uploading the CloudFormation template to be deployed

    Figure 2 – Uploading the CloudFormation template to be deployed

  4. For Stack Name, name your stack macie-blog, and then choose Next.

    Figure 3: Naming your CloudFormation stack

    Figure 3: Naming your CloudFormation stack

  5. For Configure stack options, keep the default values and choose Next.
  6. At the bottom of the Review screen, select the I acknowledge that AWS CloudFormation might create IAM resources check box, and then choose Create stack.

    Figure 4: Acknowledging that this CloudFormation template will create IAM roles

    Figure 4: Acknowledging that this CloudFormation template will create IAM roles

You should then see the following screen. It may take several minutes for the CloudFormation template to finish deploying.

Figure 5: CloudFormation stack creation in progress

Figure 5: CloudFormation stack creation in progress

View CloudFormation output

Once the CloudFormation template has been completely deployed, choose the Outputs tab, and you will see the following screen. Here you’ll find the names and URLs for all the AWS resources that are needed to complete the remainder of the solution.

Figure 6: Completed CloudFormation stack output

Figure 6: Completed CloudFormation stack output

For easier reference, open a new browser tab to your AWS Management Console and leave this tab open. This will make it easier to quickly copy and paste the resource URLs as you navigate to different resources during this walkthrough.

Import DynamoDB data

In this section, we walk through importing the test dataset to DynamoDB. You first start by downloading the test CSV datasets, then upload those datasets to S3 and run the Lambda function that imports the data to DynamoDB. Finally, you review the data that was imported into DynamoDB.

Test datasets

Download the following test datasets:

  1. Accounts Info test dataset (accounts.csv): https://github.com/aws-samples/macie-dynamodb-blog/blob/main/datasets/accounts.csv
  2. People test dataset (people.csv): https://github.com/aws-samples/macie-dynamodb-blog/blob/main/datasets/people.csv

Upload data to the S3 import bucket

Now that you’ve downloaded the test datasets, you’ll need to navigate to the data import S3 bucket and upload the data.

To upload the datasets to the S3 import bucket

  1. Navigate to the CloudFormation Outputs tab, where you’ll find the bucket information.

    Figure 7: S3 bucket output values for the CloudFormation stack

    Figure 7: S3 bucket output values for the CloudFormation stack

  2. Copy the ImportS3BucketURL link and navigate to the URL.
  3. Upload the two test CSV datasets, people.csv and accounts.csv, to your S3 bucket.
  4. After the upload is complete, you should see the two CSV files in the S3 bucket. You’ll use these files as your test DynamoDB data.

    Figure 8: Test S3 datasets in the S3 bucket

    Figure 8: Test S3 datasets in the S3 bucket

View the data import Lambda function

Now that you have your test data staged for loading, you’ll import it into DynamoDB by using a Lambda function that was deployed with the CloudFormation template. To start, navigate to the CloudFormation console and get the URL to the Lambda function that will handle the data import to DynamoDB, as shown in figure 9.

Figure 9: CloudFormation output information for the People DynamoDB table

Figure 9: CloudFormation output information for the People DynamoDB table

To run the data import Lambda function

  1. Copy the LambdaImportS3DataToDynamoURL link and navigate to the URL. You will see the Import-Data-To-DynamoDB Lambda function, as shown in figure 10.

    Figure 10: The Lambda function that imports data to DynamoDB

    Figure 10: The Lambda function that imports data to DynamoDB

  2. Choose the Test button in the upper right-hand corner. In the dialog screen, for Event name, enter Test and replace the value with {}.
  3. Your screen should now look as shown in figure 11. Choose Create.

    Figure 11: Configuring a test event to manually run the Lambda function

    Figure 11: Configuring a test event to manually run the Lambda function

  4. Choose the Test button again in the upper right-hand corner. You should now see the Lambda function running, as shown in figure 12.

    Figure 12: View of the Lambda function running

    Figure 12: View of the Lambda function running

  5. Once the Lambda function is finished running, you can expand the Details section. You should see a screen similar to the one in figure 13. When you see this screen, the test datasets have successfully been imported into the DynamoDB tables.

    Figure 13: View of the data import Lambda function after it runs successfully

    Figure 13: View of the data import Lambda function after it runs successfully

View the DynamoDB test dataset

Now that you have the datasets imported, you can look at the data in the console.

To view the test dataset

  1. Navigate to the two DynamoDB tables. You can do this by getting the URL values from the CloudFormation Outputs tab. Figure 14 shows the URL for the accounts tables.
    Figure 14: Output values for CloudFormation stack DynamoDB account tables

    Figure 14: Output values for CloudFormation stack DynamoDB account tables

    Figure 15 shows the URL for the people tables.

    Figure 15: Output values for CloudFormation stack DynamoDB people tables

    Figure 15: Output values for CloudFormation stack DynamoDB people tables

  2. Copy the AccountsDynamoDBTableURL link value and navigate to it in the browser. Then choose the Items tab.
    Figure 16: View of DynamoDB account-info-macie table data

    Figure 16: View of DynamoDB account-info-macie table data

    You should now see a screen showing data similar to the screen in figure 16. This DynamoDB table stores the test account data that you will use to run a Macie discovery job against after the data has been exported to S3.

  3. Navigate to the PeopleDynamoDBTableURL link that is in the CloudFormation output. Then choose the Items tab.

    Figure 17: View of DynamoDB people table data

    Figure 17: View of DynamoDB people table data

You should now see a screen showing data similar to the screen in figure 17. This DynamoDB table stores the test people data that you will use to run a Macie discovery job against after the data has been exported to S3.

Export DynamoDB data to S3

In the previous section, you set everything up and staged the data to DynamoDB. In this section, you will export data from DynamoDB to S3.

View the EventBridge rule

The EventBridge rule that was deployed earlier allows you to automatically schedule the export of DynamoDB data to S3. You will can export data in hours, in minutes, or in days. The purpose of the EventBridge rule is to allow you to set up an automated data pipeline from DynamoDB to S3. For demonstration purposes, you’ll run the Lambda function that the EventBridge rule uses manually, so that you can see the data be exported to S3 without having to wait.

To view the EventBridge rule

  1. Navigate to the CloudFormation Outputs tab for the CloudFormation stack you deployed earlier.

    Figure 18: CloudFormation output information for the EventBridge rule

    Figure 18: CloudFormation output information for the EventBridge rule

  2. Navigate to the EventBridgeRule link. You should see the following screen.

    Figure 19: EventBridge rule configuration details page

    Figure 19: EventBridge rule configuration details page

On this screen, you can see that we’ve set the event schedule to run every hour. The interval can be changed to fit your business needs. We have set it for 1 hour for demonstration purposes only. To make changes to the interval, you can choose the Edit button to make changes and then save the rule.

In the Target(s) section, we’ve configured a Lambda function named Export-DynamoDB-Data-To-S3 to handle the process of exporting data to the S3 bucket the Macie discovery job will run against. We will cover the Lambda function that handles the export of the data from DynamoDB next.

View the data export Lambda function

In this section, you’ll take a look at the Lambda function that handles the exporting of DynamoDB data to the S3 bucket that Macie will run its discovery job against.

To view the Lambda function

  1. Navigate to the CloudFormation Outputs tab for the CloudFormation stack you deployed earlier.

    Figure 20: CloudFormation output information for the Lambda function that exports DynamoDB data to S3

    Figure 20: CloudFormation output information for the Lambda function that exports DynamoDB data to S3

  2. Copy the link value for LambdaExportDynamoDBDataToS3URL and navigate to the URL in your browser. You should see the Python code that will handle the exporting of data to S3. The code has been commented so that you can easily follow it and refactor it for your needs.
  3. Scroll to the Environment variables section.
    Figure 21: Environment variables used by the Lambda function

    Figure 21: Environment variables used by the Lambda function

    You will see two environment variables:

  • bucket_to_export_to – This environment variable is used by the function as the S3 bucket location to save the DynamoDB data to. This is the bucket that the Macie discovery will run against.
  • dynamo_db_tables – This environment variable is a comma-delimited list of DynamoDB tables that will be read and have data exported to S3. If there was another table that you wanted to export data from, you would simply add it to the comma-delimited list and it would be part of the export.

Export DynamoDB data

In this section, you will manually run the Lambda function to export the DynamoDB tables data to S3. As stated previously, you would normally allow the EventBridge rule to handle the automated export of the data to S3. In order to see the export in action, you’re going to manually run the function.

To run the export Lambda function

  1. In the console, scroll back to the top of the screen and choose the Test button.
  2. Name the test dynamoDBExportTest, and for the test data create an empty JSON object “{}” as shown in figure 22.

    Figure 22: Configuring a test event to manually test the data export Lambda function

    Figure 22: Configuring a test event to manually test the data export Lambda function

  3. Choose Create.
  4. Choose the Test button again to run the Lambda function to export the DynamoDB data to S3.

    Figure 23: View of the screen where you run the Lambda function to export data

    Figure 23: View of the screen where you run the Lambda function to export data

  5. It could take about one minute to export the data from DynamoDB to S3. Once the Lambda function exports the data, you should see a screen similar to the following one.

    Figure 24: The result after you successfully run the data export Lambda function

    Figure 24: The result after you successfully run the data export Lambda function

View the exported DynamoDB data

Now that the DynamoDB data has been exported for Macie to run discovery jobs against, you can navigate to S3 to verify that the files exported to the bucket.

To view the data, navigate to the CloudFormation stack Output tab. Find the ExportS3BucketURL, shown in figure 25, and navigate to the link.

Figure 25: CloudFormation output information for the S3 buckets that the DynamoDB data was exported to

Figure 25: CloudFormation output information for the S3 buckets that the DynamoDB data was exported to

You should then see two different JSON files for the two DynamoDB tables that data was exported from, as shown in figure 26.

Figure 26: View of S3 objects that were exported to S3

Figure 26: View of S3 objects that were exported to S3

This is the file naming convention that’s used for the files:

<Service-name>-<DynamoDB-Table-Name>-<AWS-Region>-<DataAndTime>.json

Next, you’ll create a Macie discovery job to run against the files in this S3 bucket to discover sensitive data.

Create the Macie discovery job

In this section, you’ll create a Macie discovery job and view the results after the job has finished running.

To create the discovery job

  1. In the AWS Management Console, navigate to Macie. In the left-hand menu, choose Jobs.

    Figure 27: Navigation menu to Macie discovery jobs

    Figure 27: Navigation menu to Macie discovery jobs

  2. Choose the Create job button.

    Figure 28: Macie discovery job list screen

    Figure 28: Macie discovery job list screen

  3. Using the Bucket Name filter, search for the S3 bucket that the DynamoDB data was exported to. This can be found in the CloudFormation stack output, as shown in figure 29.

    Figure 29: CloudFormation stack output

    Figure 29: CloudFormation stack output

  4. Select the value you see for ExportS3BucketName, as shown in figure 30.

    Note: The value you see for your bucket name will be slightly different, based on the random characters added to the end of the bucket name generated by CloudFormation.

    Figure 30: Selecting the S3 bucket to include in the Macie discovery job

    Figure 30: Selecting the S3 bucket to include in the Macie discovery job

  5. Once you’ve found the S3 bucket, select the check box next to it, and then choose Next.
  6. On the Review S3 Buckets screen, if you’re satisfied with the selected buckets, choose Next.

Following are some important options when setting up Macie data discovery jobs.

Scheduling
You have the following scheduling options for the data discovery job:

  • Daily
  • Weekly
  • Monthly

Data Sampling
This allows you to randomly sample a percentage of the data that the Macie discovery job will run against.

Object criteria
This enables you to target objects based on certain metadata values. The values are:

  • Tags – Target objects with certain tags.
  • Last modified – Target objects based on when they were last modified.
  • File extensions – Target objects based on file extensions.
  • Object size – Target objects based on the file size.

You can include or exclude objects based on these object criteria filters.

Set the discovery job scope

For demonstration purposes, this will be a one-time discovery job.

To set the discovery job scope

  1. On the Scope page that appears after you create the job, set the following options for the job scope:
    1. Select the One-time job option.
    2. Leave Sampling depth set to 100%, and choose Next.

      Figure 31: Selecting the objects that should be in scope for this discovery job

      Figure 31: Selecting the objects that should be in scope for this discovery job

  2. On the Custom data identifiers screen, select account_number, and then choose Next.With the custom identifier, you can create custom business logic to look for certain patterns in files stored in S3. In this example, the job generates a finding for any file that contains data with the following format:

    Account Number Format: Starts with “XYZ-” followed by 11 numbers

    The logic to create a custom data identifier can be found in the CloudFormation template.

    Figure 32: Custom data identifiers

    Figure 32: Custom data identifiers

  3. Give your discovery job the name dynamodb-macie-discovery-job. For Description, enter Discovery job to detect sensitive data exported from DynamoDB, and choose Next.
    Figure 33: Giving the Macie discovery job a name and description

    Figure 33: Giving the Macie discovery job a name and description

    You will then see the Review and create screen, as shown in figure 34.

    Figure 34: The Macie discovery job review screen

    Figure 34: The Macie discovery job review screen

    Note: Macie must have proper permissions to decrypt objects that are part of the Macie discovery job. The CloudFormation template that you deployed during the initial setup has already deployed an AWS Key Management Service (AWS KMS) key with the proper permissions.

    For this proof of concept you won’t store the results, so you can select the check box next to Override this requirement. If you wanted to store detailed results of the discovery job long term, you would configure a repository for data discovery results. To view detailed steps for setting this up, see Storing and retaining discovery results with Amazon Macie.

Submit the discovery job

Next, you can submit the discovery job. On the Review and create screen, choose the Submit button to start the discovery job. You should see a screen similar to the following.

Figure 35: A Macie discovery job run that is in progress

Figure 35: A Macie discovery job run that is in progress

The amount of data that is being scanned dictates how long the job will take to run. You can choose the Refresh button at the top of the screen to see the updated status of the job. This job, based on the size of the test dataset, will take about seven minutes to complete.

Review the job results

Now that the Macie discovery job has run, you can review the results to see what sensitive data was discovered in the data exported from DynamoDB.

You should see the following screen once the job has successfully run.

Figure 36: View of the completed Macie discovery job

Figure 36: View of the completed Macie discovery job

On the right, you should see another pane with more information related to the discovery job. The pane should look like the following screen.

Figure 37: Summary showing which S3 bucket the discovery job ran against and start and complete time

Figure 37: Summary showing which S3 bucket the discovery job ran against and start and complete time

Note: If you don’t see this pane, choose on the discovery job to have this information displayed.

To review the job results

  1. On the page for the discovery job, in the Show Results list, select Show findings.

    Figure 38: Option to view discovery job findings

    Figure 38: Option to view discovery job findings

  2. The Findings screen appears, as follows.
    Figure 39: Viewing the list of findings generated by the Macie discovery job

    Figure 39: Viewing the list of findings generated by the Macie discovery job

    The discovery job that you ran has two different “High Severity” finding types:

    SensitiveData:S3Object/Personal – The object contains personal information, such as full names or identification numbers.

    SensitiveData:S3Object/Multiple – The object contains more than one type of sensitive data.

    Learn more about Macie findings types.

  3. Choose the SensitiveData:S3Object/Personal finding type, and you will see an information pane appear to the right, as shown in figure 40.Some of the key information that you can find here:

    Severity – What the severity of the finding is: Low, Medium, or High.
    Resource – The S3 bucket where the S3 object exists that caused the finding to be generated.
    Region – The Region where the S3 bucket exists.

    Figure 40: Viewing the severity of the discovery job finding

    Figure 40: Viewing the severity of the discovery job finding

    Since the finding is based on the detection of personal information in the S3 object, you get the number of times and type of personal data that was discovered, as shown in figure 41.

    Figure 41: Viewing the number of social security numbers that were discovered in the finding

    Figure 41: Viewing the number of social security numbers that were discovered in the finding

    Here you can see that 10 names were detected in the data that you exported from the DynamoDB table. Occurrences of name equals 10 line ranges, which tells you that the names were found on 10 different lines in the file. If you choose the 10 line ranges link, you are given the starting line and column in the document where the name was discovered.

    The S3 object that triggered the finding is displayed in the Resource affected section, as shown in figure 42.

    Figure 42: The S3 object that generated the Macie finding

    Figure 42: The S3 object that generated the Macie finding

Now that you know which S3 object contains the sensitive data, you can investigate further to take appropriate action to protect the data.

View the Macie finding details

In this section, you will walk through how to read and download the objects related to the Macie discovery job.

To download and view the S3 object that contains the finding

  1. In the Overview section of the finding details, select the value for the Resource link. You will then be taken to the object in the S3 bucket.

    Figure 43: Viewing the S3 bucket where the object is located that generated the Macie finding

    Figure 43: Viewing the S3 bucket where the object is located that generated the Macie finding

  2. You can then download the S3 object from the S3 bucket to view the file content and further investigate the file content for sensitive data. Select the check box next to the S3 object, and choose the Download button at the top of the screen.Next, we will look at the SensitiveData:S3Object/Multiple finding type that was generated. This finding type lets us know that there are multiple types of potentially sensitive data related to an object stored in S3.
  3. In the left navigation menu, navigate back to the Jobs menu.
  4. Choose the job that you created in the previous steps. In the Show Results list, select Show Findings.
  5. Select the SensitiveData:S3Object/Multiple finding type. An information pane appears to the right. As with the previous finding, you will see the severity, Region, S3 bucket location, and other relevant information about the finding. For this finding, we will focus on the Custom data identifiers and Personal info sections.
    Figure 44: Details about the sensitive data that was discovered by the Macie discovery job

    Figure 44: Details about the sensitive data that was discovered by the Macie discovery job

    Here you can see that the discovery job found 10 names on 10 different lines in the file. Also, you can see that 10 account numbers were discovered on 10 different lines in the file, based on the custom identifier that was included as part of the discovery job.

    This finding demonstrates how you can use the built-in Macie identifiers, such as names, and also include custom business logic based on your organization’s needs by using Macie custom data identifiers.

    To view the data and investigate further, follow the same steps as in the previous finding you investigated.

  6. Navigate to the top of the screen and in the Overview section, locate the Resource.

    Figure 45: Viewing the S3 bucket where the object is located that generated the Macie finding

    Figure 45: Viewing the S3 bucket where the object is located that generated the Macie finding

  7. Choose Resource, which will take you to the S3 object to download. You can now view the contents of the file and investigate further.

You’ve now created a Macie discovery job to scan for sensitive data stored in an S3 bucket that originated in DynamoDB. You can also automate this solution further by using EventBridge rules to detect Macie findings to take actions against those objects with sensitive data.

Solution cleanup

In order to clean up the solution that you just deployed, complete the following steps. Note that you need to do these steps to stop data from being exported from DynamoDB to S3 every 1 hour.

To perform cleanup

  1. Navigate to the S3 buckets used to import and export data. You can find the bucket names in the CloudFormation Outputs tab in the console, as shown in figure 7 and figure 25.
  2. After you’ve navigated to each of the buckets, delete all objects from the bucket.
  3. Navigate to the CloudFormation console, and then delete the CloudFormation stack named macie-blog. After the stack is deleted, the solution will no longer be deployed in your AWS account.

Summary

After deploying the solution, we hope you have a better understanding of how you can use Macie to detect sensitive from other data sources, such as DynamoDB, as outlined in this post. The following are links to resources that you can use to further expand your knowledge of Amazon Macie capabilities and features.

Additional resources

If you have feedback about this post, submit comments in the Comments section below.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Sheldon Sides

Sheldon is a Senior Solutions Architect, focused on helping customers implement native AWS security services. He enjoys using his experience as a consultant and running a cloud security startup to help customers build secure AWS Cloud solutions. His interests include working out, software development, and learning about the latest technologies.

ICYMI: Serverless pre:Invent 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-preinvent-2020/

During the last few weeks, the AWS serverless team has been releasing a wave of new features in the build-up to AWS re:Invent 2020. This post recaps some of the most important releases for serverless developers.

re:Invent is virtual and free to all attendees in 2020 – register here. See the complete list of serverless sessions planned and join the serverless DA team live on Twitch. Also, follow your DAs on Twitter for live recaps and Q&A during the event.

AWS re:Invent 2020

AWS Lambda

We launched Lambda Extensions in preview, enabling you to more easily integrate monitoring, security, and governance tools into Lambda functions. You can also build your own extensions that run code during Lambda lifecycle events, and there is an example extensions repo for starting development.

You can now send logs from Lambda functions to custom destinations by using Lambda Extensions and the new Lambda Logs API. Previously, you could only forward logs after they were written to Amazon CloudWatch Logs. Now, logging tools can receive log streams directly from the Lambda execution environment. This makes it easier to use your preferred tools for log management and analysis, including Datadog, Lumigo, New Relic, Coralogix, Honeycomb, or Sumo Logic.

Lambda Extensions API

Lambda launched support for Amazon MQ as an event source. Amazon MQ is a managed broker service for Apache ActiveMQ that simplifies deploying and scaling queues. This integration increases the range of messaging services that customers can use to build serverless applications. The event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service manages an internal poller to invoke the target Lambda function.

We also released a new layer to make it simpler to integrate Amazon CodeGuru Profiler. This service helps identify the most expensive lines of code in a function and provides recommendations to help reduce cost. With this update, you can enable the profiler by adding the new layer and setting environment variables. There are no changes needed to the custom code in the Lambda function.

Lambda announced support for AWS PrivateLink. This allows you to invoke Lambda functions from a VPC without traversing the public internet. It provides private connectivity between your VPCs and AWS services. By using VPC endpoints to access the Lambda API from your VPC, this can replace the need for an Internet Gateway or NAT Gateway.

For developers building machine learning inferencing, media processing, high performance computing (HPC), scientific simulations, and financial modeling in Lambda, you can now use AVX2 support to help reduce duration and lower cost. By using packages compiled for AVX2 or compiling libraries with the appropriate flags, your code can then benefit from using AVX2 instructions to accelerate computation. In the blog post’s example, enabling AVX2 for an image-processing function increased performance by 32-43%.

Lambda now supports batch windows of up to 5 minutes when using SQS as an event source. This is useful for workloads that are not time-sensitive, allowing developers to reduce the number of Lambda invocations from queues. Additionally, the batch size has been increased from 10 to 10,000. This is now the same as the batch size for Kinesis as an event source, helping Lambda-based applications process more data per invocation.

Code signing is now available for Lambda, using AWS Signer. This allows account administrators to ensure that Lambda functions only accept signed code for deployment. Using signing profiles for functions, this provides granular control over code execution within the Lambda service. You can learn more about using this new feature in the developer documentation.

Amazon EventBridge

You can now use event replay to archive and replay events with Amazon EventBridge. After configuring an archive, EventBridge automatically stores all events or filtered events, based upon event pattern matching logic. You can configure a retention policy for archives to delete events automatically after a specified number of days. Event replay can help with testing new features or changes in your code, or hydrating development or test environments.

EventBridge archived events

EventBridge also launched resource policies that simplify managing access to events across multiple AWS accounts. This expands the use of a policy associated with event buses to authorize API calls. Resource policies provide a powerful mechanism for modeling event buses across multiple account and providing fine-grained access control to EventBridge API actions.

EventBridge resource policies

EventBridge announced support for Server-Side Encryption (SSE). Events are encrypted using AES-256 at no additional cost for customers. EventBridge also increased PutEvent quotas to 10,000 transactions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). This helps support workloads with high throughput.

AWS Step Functions

Synchronous Express Workflows have been launched for AWS Step Functions, providing a new way to run high-throughput Express Workflows. This feature allows developers to receive workflow responses without needing to poll services or build custom solutions. This is useful for high-volume microservice orchestration and fast compute tasks communicating via HTTPS.

The Step Functions service recently added support for other AWS services in workflows. You can now integrate API Gateway REST and HTTP APIs. This enables you to call API Gateway directly from a state machine as an asynchronous service integration.

Step Functions now also supports Amazon EKS service integration. This allows you to build workflows with steps that synchronously launch tasks in EKS and wait for a response. In October, the service also announced support for Amazon Athena, so workflows can now query data in your S3 data lakes.

These new integrations help minimize custom code and provide built-in error handling, parameter passing, and applying recommended security settings.

AWS SAM CLI

The AWS Serverless Application Model (AWS SAM) is an AWS CloudFormation extension that makes it easier to build, manage, and maintains serverless applications. On November 10, the AWS SAM CLI tool released version 1.9.0 with support for cached and parallel builds.

By using sam build --cached, AWS SAM no longer rebuilds functions and layers that have not changed since the last build. Additionally, you can use sam build --parallel to build functions in parallel, instead of sequentially. Both of these new features can substantially reduce the build time of larger applications defined with AWS SAM.

Amazon SNS

Amazon SNS announced support for First-In-First-Out (FIFO) topics. These are used with SQS FIFO queues for applications that require strict message ordering with exactly once processing and message deduplication. This is designed for workloads that perform tasks like bank transaction logging or inventory management. You can also use message filtering in FIFO topics to publish updates selectively.

SNS FIFO

AWS X-Ray

X-Ray now integrates with Amazon S3 to trace upstream requests. If a Lambda function uses the X-Ray SDK, S3 sends tracing headers to downstream event subscribers. With this, you can use the X-Ray service map to view connections between S3 and other services used to process an application request.

AWS CloudFormation

AWS CloudFormation announced support for nested stacks in change sets. This allows you to preview changes in your application and infrastructure across the entire nested stack hierarchy. You can then review those changes before confirming a deployment. This is available in all Regions supporting CloudFormation at no extra charge.

The new CloudFormation modules feature was released on November 24. This helps you develop building blocks with embedded best practices and common patterns that you can reuse in CloudFormation templates. Modules are available in the CloudFormation registry and can be used in the same way as any native resource.

Amazon DynamoDB

For customers using DynamoDB global tables, you can now use your own encryption keys. While all data in DynamoDB is encrypted by default, this feature enables you to use customer managed keys (CMKs). DynamoDB also announced support for global tables in the Europe (Milan) and Europe (Stockholm) Regions. This feature enables you to scale global applications for local access in workloads running in different Regions and replicate tables for higher availability and disaster recovery (DR).

The DynamoDB service announced the ability to export table data to data lakes in Amazon S3. This enables you to use services like Amazon Athena and AWS Lake Formation to analyze DynamoDB data with no custom code required. This feature does not consume table capacity and does not impact performance and availability. To learn how to use this feature, see this documentation.

AWS Amplify and AWS AppSync

You can now use existing Amazon Cognito user pools and identity pools for Amplify projects, making it easier to build new applications for an existing user base. AWS Amplify Console, which provides a fully managed static web hosting service, is now available in the Europe (Milan), Middle East (Bahrain), and Asia Pacific (Hong Kong) Regions. This service makes it simpler to bring automation to deploying and hosting single-page applications and static sites.

AWS AppSync enabled AWS WAF integration, making it easier to protect GraphQL APIs against common web exploits. You can also implement rate-based rules to help slow down brute force attacks. Using AWS Managed Rules for AWS WAF provides a faster way to configure application protection without creating the rules directly. AWS AppSync also recently expanded service availability to the Asia Pacific (Hong Kong), Middle East (Bahrain), and China (Ningxia) Regions, making the service now available in 21 Regions globally.

Still looking for more?

Join the AWS Serverless Developer Advocates on Twitch throughout re:Invent for live Q&A, session recaps, and more! See this page for the full schedule.

For more serverless learning resources, visit Serverless Land.

Detect change points in your event data stream using Amazon Kinesis Data Streams, Amazon DynamoDB and AWS Lambda

Post Syndicated from Marco Guerriero original https://aws.amazon.com/blogs/big-data/detect-change-points-in-your-event-data-stream-using-amazon-kinesis-data-streams-amazon-dynamodb-and-aws-lambda/

The success of many modern streaming applications depends on the ability to sequentially detect each change as soon as possible after it occurs, while continuing to monitor the data stream as it evolves. Applications of change point detection range across genomics, marketing, and finance, to name a few. In genomics, change point detection can help identify genes that are damaged. In marketing, we can identify things like customer churns in real time or when customer engagement changes over time. This is very useful in areas like online retail where, if change detection is implemented, we can adapt much more quickly to customer behavior. In finance, we can detect moments in time when stock prices have significantly changed. As opposed to online batch methodologies that take a batch of historical data and look for the change points in that data, we use a streaming procedure where we detect change points as fast as possible in real time as new data comes in.

In this post, we demonstrate automated event detection (AED), a fully automated, non-parametric, multiple change point detection algorithm that can operate on massive streaming data with low computational complexity, quickly reacting to the changes. Quick change detection [1] can help a system raise a timely alarm. Quickly reacting to a sudden fault arising in an industrial process or a production process could lead to significant savings in unplanned downtime. Detecting the onset of the outbreak of a disease, or the effect of a bio-terrorist attack, is critical, both for effective initiation of public health intervention measures and to timely alert government agencies and the general public.

In AED, no statistical assumption (such as Gaussian) is made on the generative process of the time series or data streams being processed. We rely on a class of tests called non-parametric or distribution-free tests [2, 3, 4, 5]. These tests are the natural choice for performing change point detection on data streams with unknown statistical distribution, which represents a common scenario that applies to a wide variety of real-world processes. We demonstrate a Python implementation of AED embedded within AWS Lambda over a data stream processed and managed by Amazon Kinesis Data Streams.

Understanding event detection

Let’s talk for a moment about anomaly detection, data drift, and change point detection in time series.

In machine learning (ML), we hear a lot about anomaly detection. In the context of a time series, that often means a data point that is outside the expected range. That range may have a static definition, such as “we never expect the temperature to exceed 130F,” or a dynamic definition, such as “it should not vary from recent averages by more than three standard deviations.” An example of a dynamic anomaly detection algorithm is Random Cut Forest.

In contrast, data drift describes a slower shift in the stream’s data distribution. It may mean a shift in the mean of a variable, such as a shift in temperatures between summer and winter, or a change in behavior of visitors to a website as certain product categories fall out of favor and others become fashionable. A frequently recommended approach for dealing with data drift is frequent retraining of an ML model, to keep recommendations fresh and relevant based on recent data.

The events AED focuses on are in a different category altogether: sudden shifts in the data stream to a new level, sometimes called regime change. These are occasions where human intervention may be desired to see whether automated responses are appropriate. For example, a slow shift in readings from a temperature sensor may be normal, whereas a sudden large change may be a sign of a major malfunction in progress. Other data streams that constantly fluctuate but where a sudden large change may require oversight include equipment monitoring and malfunctions, cyber attacks, stock market movements, power grids and outages, and operational variations in manufacturing processes, to name a few [1].

Our goal is to automatically detect multiple events when the time series or the data streams change their behavior. In this version, we focus on sudden changes in mean or offset in time series. AED is an unsupervised, automatic, statistically rigorous, real-time procedure for detecting relevant events, which doesn’t require any training data. It can also be used as an automatic process for feature extraction (such as time series segmentation) that can be used in downstream ML systems.

Automated event detection

AED operates sequentially as data comes in, continuously performing a novel statistical test to identify a change event—a significant shift in the data from the data that preceded that point in time. If an event is detected at time ti, all the previous data prior to ti is discarded. AED is then reset to process new data points starting with the (ti +1)th data point, looking for a new change event. This procedure is repeated sequentially until no more data points are available.

The core of AED is represented by the statistical test called the Modified Practical Pettitt (MPP) test. The MPP test is a variant of the Pettitt test [6], which is a very powerful and robust non-parametrical statistical procedure to detect a single change point in data streams where the distribution is completely unknown. Pettitt’s test is based on Null Hypothesis Significance Testing (NHST), which comes with some constraints and fallacies. In fact, a very important problem with NHST is that the result of the test doesn’t provide information about the magnitude of the effect of the event—which is key information we’re interested in. A consequence of this limit is that an event (change point) can be detected despite a very small effect. But is an increase of 0.001 Celsius degrees in temperature data streams coming from some IoT devices or a decrease of $1 in a particular stock price noteworthy events? Statistical significance doesn’t tell us anything about practical relevance. The MPP test solves this problem by incorporating a practical significance threshold into the definition of the decision test statistic. In this way, only events that are both statistically and practically significant are detected and reported by AED.

The decision statistic of the MPP test can be computed recursively, resulting in a computational complexity that is linear in the number of data points processed by AED up to the reset time.

Prerequisites

To get started, we need the following:

  • A continuous or discrete time series (data stream). No knowledge of the distribution of the data is needed.
  • Some guidance for the algorithm in terms of when to alert us:
    • The statistical confidence level we require that an event has occurred (p value).
    • The practical significance level we require (our practical significance threshold).

The output of AED is a list of change points specified by their time of occurrence, their p-value, and the magnitude of the offset.

Solution overview

The following diagram illustrates the AWS services used to implement the solution.

The steps in the process are as follows:

  1. One or more programs, devices, or sensors generate events and place them into a Kinesis data stream. These events make up a time series. In our case, we provide a sample generator function.
  2. The event recorder Lambda function consumes records from the data stream.
  3. The Lambda function stores them in an Amazon DynamoDB events table.
  4. The DynamoDB table streams the inserted events to the event detection Lambda function.
  5. The Lambda function checks each event to see whether this is a change point. To do so, it performs the following actions:
    • Reads the last change point recorded from the DynamoDB change points table (or creates one if this is the first data point for this device).
    • Reads the prior events since the last change point from the events table, as a time series.
    • Runs the event-detection algorithm over the retrieved time series.
  6. If a new change point is detected, the function does the following:
    • Writes the change point into the DynamoDB change points table, for later use.
    • Sends an Amazon Simple Notification Service (Amazon SNS) message to a topic with the change point details.

Setting up the solution infrastructure

To set up the infrastructure components and configure the connections, launch a predefined AWS CloudFormation stack. The stack sets up the following resources:

  • A Kinesis data stream.
  • Two Lambda functions: the event recorder, and the event detection. Each function has an associated AWS Identity and Access Management (IAM) role.
  • Two DynamoDB tables: one to hold events, and one for detected change points.
  • An SNS topic and a subscription, for notifying that a change point has been detected.

To see this solution in operation in the US East (N. Virginia) Region, launch the provided CloudFormation stack. The total solution costs approximately $0.02 per hour to run, depending on the volume of data sent to the components. Remember to delete the CloudFormation stack when you’re finished with the solution to avoid additional charges.

To run the stack, complete the following steps:

  1. Choose Launch Stack:
  1. Choose Next.
  2. Update the email address for notifications to be a valid email address of your choice.
  3. Update the following parameters for your environment if necessary, or, for now, leave the defaults:
    1. Environment parameters – Names for your DynamoDB tables, Kinesis stream, and SNS topic
    2. AED parameters – Startup, alpha level, change difference window, and practical significance threshold
  4. Choose Next.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Create stack.
  7. Wait for the CloudFormation stack to reach a status of CREATE COMPLETE (around 5 minutes).
  8. Check the Outputs tab for the resources created.

If you want to receive the SNS messages for detected change points, remember to confirm your SNS subscription for the email address.

Generating data

After AWS CloudFormation creates the system resources for us, we’re ready to start generating some data. For this post, we provide a client that generates a time series with periodic regime shifts. For simplicity and to show generality, the client also runs AED over the time series it creates. At the end, it generates a time series plot of the generated data along with the detected change points. These should match the SNS notifications you receive.

  1. To download and set up the test clients, copy the following files to your local desktop:
    1. time series client.py
    2. time series validation client.py
    3. AmazonAED.py
  2. Install boto3, numpy, and matplotlib, if you don’t already have them installed.

Two test clients are provided:

  • The time_series_client.py generates data and puts it into Kinesis Data Streams; this is what your device does.
  • For easier demonstration, we also provide a time_series_validation_client. This version also runs AED locally and generates an annotated plot showing the input data, where the change points are detected, and where the change actually occurred.

To run the validation client, open a command prompt to the folder where you copied the client files and run the following code:

python time_series_validation_client.py  -r us-east-1 -s aed_events_data_stream

You can pass the –h parameter to find out the other parameters. By default, the client generates 1 minute’s worth of data. The parameters let you configure the client for your environment, and, when running the validation client, for the local AED detection (which is independent of that being performed in the Lambda function).

You can use four parameters to configure AED’s behavior to match your environment:

  • “-a”, “–alpha” – The AED alpha level, that is, the smallest change to identify. The default is 0.01.
  • “-w”, “–window” – The AED change difference window, the number of points around the change to ignore. The default is 5.
  • “-x”, “–startup” – The number of samples to include in the AED initialization stage. The default is 5.
  • “-p”, “–practicalthreshold” – The AED practical significance threshold. Changes smaller than this are not designated as change points. The default is 0.

At the end of the time period, the client displays an image of the generated time series. The following time series is annotated with a small blue triangle whenever the client created a change point. The small red triangles mark the points where the AED algorithm, running independently over the generated data, detected the change points. The blue triangle always precedes the red triangle in our DetectBeforeDecide framework, where the AED algorithm has to first detect that something is changing (red triangle) and then work backwards to decide when in time the change most likely occurred (blue triangle).

The turquoise dot shows where the data generation was restarted; this change point may not be detected because AED isn’t running at that time. As you can see, each generated change point was detected. At the same time, the significant variation between the change points—in essence, within a single regime—didn’t cause a spurious change point detection.

The practical significance threshold additionally allows you to specify when the change is small enough that it’s not relevant and should be ignored.

You can also review the contents of the DynamoDB tables on the AWS Management Console (see the following screenshot). The table aed_stream_data shows each event in the stream logged. The aed_change_points table shows the individual change points detected, along with the timestamp of the detection and the data point at the time the change was detected by the Lambda function running over the stream data. This data lets you construct a history of the event stream and the change points, and manipulate them according to your need.

Each Lambda function is started by the Kinesis stream; it checks to see when the last change point occurred. It then retrieves the data since that change point from the aed_stream_data table and reprocesses it, looking for a new change point. As the gaps between change points get very large, this may become a large sequence to retrieve and process. If this is your use case, you may wish to artificially create a new set point every so often to reduce the amount of data that must be reread.

Cleaning up

To avoid additional charges, you should delete the CloudFormation stack after walking through the solution.

To implement AED in a different Region or to adapt it to your own needs, download the aed.zip file.

Conclusion

In this post, we introduced automated event detection (AED), a new, fast, scalable, fully automated, non-parametric, multiple change point detection service that can operate on massive streaming data with low computational complexity. AED efficiently identifies shifts in the data stream that have both statistical and practical significance. This ability to locate the time and magnitude of a shift in the data stream, and to differentiate it from normal data fluctuations, is key in many applications. The appropriate action varies by use case: it may be appropriate to take automated action, or call a human to evaluate whether the preplanned actions should be taken. A natural extension of AED would be to add an algorithm to detect trends (gradual changes as opposed to sudden shifts) in data streams using non-parametric tests such as Mann-Kendall [7, 8]. Combining these functions allows you to identify both kinds of changes in your incoming data stream and distinguish between them, further broadening the use cases.

In this post, we demonstrated a Python implementation of AED embedded within Lambda over a data stream processed and managed by Kinesis Data Streams. We also provided a standalone client implementation to show an alternate implementation. The code is available for you to download and integrate into your applications. Do you have any data streams where sudden shifts may happen? Do you want to know when they happen? Are there automated actions that should be taken, or should someone be alerted? If your answer to any of these questions is “Yes!” then consider implementing AED.

If you have any comments about this post, submit them in the comments section.

References

[1] H. V. Poor and O. Hadjiliadis (2009). Quickest detection. Cambridge University Press, 2009.

[2] Brodsky, E., and Boris S. Darkhovsky (2013). Nonparametric methods in change point problems. Vol. 243. Springer Science & Business Media.

[3] Csörgö, Miklós, and Lajos Horváth (1997). Limit theorems in change-point analysis. Vol. 18. John Wiley & Sons Inc,.

[4] Ross GJ, Tasoulis DK, Adams NM (2011). Nonparametric Monitoring of Data Streams for Changes in Location and Scale. Technometrics, 53(4), 379–389.

[5] Chu, Lynna, and Hao Chen (2019). Asymptotic distribution-free change-point detection for multivariate and non-euclidean data. The Annals of Statistics 47.1, 382-414.

[6] Pettitt AN (1979). A Non-Parametric Approach to the Change-Point Problem. Journal of the Royal Statistical Society C, 28(2), 126–135.

[7] H. B. Mann (1945). Nonparametric tests against trend, Econometrica, vol. 13, pp. 245–259, 1945.

[8] M. G. Kendal (1975). Rank Correlation Methods, Griffin, London, UK, 1975


About the Authors

Marco Guerriero, PhD, is a Practice Manager for Emergent Technologies and Intelligence Platform for AWS Professional Services. I love working on ways for emergent technologies such as AI/ML, Big Data, IoT, and Quantum to help businesses across different industry vertical succeed within their innovation journey.

 

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Customer Packaging Experience. Until recently, she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

New – Export Amazon DynamoDB Table Data to Your Data Lake in Amazon S3, No Code Writing Required

Post Syndicated from Alex Casalboni original https://aws.amazon.com/blogs/aws/new-export-amazon-dynamodb-table-data-to-data-lake-amazon-s3/

Hundreds of thousands of AWS customers have chosen Amazon DynamoDB for mission-critical workloads since its launch in 2012. DynamoDB is a nonrelational managed database that allows you to store a virtually infinite amount of data and retrieve it with single-digit-millisecond performance at any scale. To get the most value out of this data, customers had […]

Rapid and flexible Infrastructure as Code using the AWS CDK with AWS Solutions Constructs

Post Syndicated from Biff Gaut original https://aws.amazon.com/blogs/devops/rapid-flexible-infrastructure-with-solutions-constructs-cdk/

Introduction

As workloads move to the cloud and all infrastructure becomes virtual, infrastructure as code (IaC) becomes essential to leverage the agility of this new world. JSON and YAML are the powerful, declarative modeling languages of AWS CloudFormation, allowing you to define complex architectures using IaC. Just as higher level languages like BASIC and C abstracted away the details of assembly language and made developers more productive, the AWS Cloud Development Kit (AWS CDK) provides a programming model above the native template languages, a model that makes developers more productive when creating IaC. When you instantiate CDK objects in your Typescript (or Python, Java, etc.) application, those objects “compile” into a YAML template that the CDK deploys as an AWS CloudFormation stack.

AWS Solutions Constructs take this simplification a step further by providing a library of common service patterns built on top of the CDK. These multi-service patterns allow you to deploy multiple resources with a single object, resources that follow best practices by default – both independently and throughout their interaction.

Comparison of an Application stack with Assembly Language, 4th generation language and Object libraries such as Hibernate with an IaC stack of CloudFormation, AWS CDK and AWS Solutions Constructs

Application Development Stack vs. IaC Development Stack

Solution overview

To demonstrate how using Solutions Constructs can accelerate the development of IaC, in this post you will create an architecture that ingests and stores sensor readings using Amazon Kinesis Data Streams, AWS Lambda, and Amazon DynamoDB.

An architecture diagram showing sensor readings being sent to a Kinesis data stream. A Lambda function will receive the Kinesis records and store them in a DynamoDB table.

Prerequisite – Setting up the CDK environment

Tip – If you want to try this example but are concerned about the impact of changing the tools or versions on your workstation, try running it on AWS Cloud9. An AWS Cloud9 environment is launched with an AWS Identity and Access Management (AWS IAM) role and doesn’t require configuring with an access key. It uses the current region as the default for all CDK infrastructure.

To prepare your workstation for CDK development, confirm the following:

  • Node.js 10.3.0 or later is installed on your workstation (regardless of the language used to write CDK apps).
  • You have configured credentials for your environment. If you’re running locally you can do this by configuring the AWS Command Line Interface (AWS CLI).
  • TypeScript 2.7 or later is installed globally (npm -g install typescript)

Before creating your CDK project, install the CDK toolkit using the following command:

npm install -g aws-cdk

Create the CDK project

  1. First create a project folder called stream-ingestion with these two commands:

mkdir stream-ingestion
cd stream-ingestion

  1. Now create your CDK application using this command:

npx [email protected] init app --language=typescript

Tip – This example will be written in TypeScript – you can also specify other languages for your projects.

At this time, you must use the same version of the CDK and Solutions Constructs. We’re using version 1.68.0 of both based upon what’s available at publication time, but you can update this with a later version for your projects in the future.

Let’s explore the files in the application this command created:

  • bin/stream-ingestion.ts – This is the module that launches the application. The key line of code is:

new StreamIngestionStack(app, 'StreamIngestionStack');

This creates the actual stack, and it’s in StreamIngestionStack that you will write the CDK code that defines the resources in your architecture.

  • lib/stream-ingestion-stack.ts – This is the important class. In the constructor of StreamIngestionStack you will add the constructs that will create your architecture.

During the deployment process, the CDK uploads your Lambda function to an Amazon S3 bucket so it can be incorporated into your stack.

  1. To create that S3 bucket and any other infrastructure the CDK requires, run this command:

cdk bootstrap

The CDK uses the same supporting infrastructure for all projects within a region, so you only need to run the bootstrap command once in any region in which you create CDK stacks.

  1. To install the required Solutions Constructs packages for our architecture, run the these two commands from the command line:

npm install @aws-solutions-constructs/[email protected]
npm install @aws-solutions-constructs/[email protected]

Write the code

First you will write the Lambda function that processes the Kinesis data stream messages.

  1. Create a folder named lambda under stream-ingestion
  2. Within the lambda folder save a file called lambdaFunction.js with the following contents:
var AWS = require("aws-sdk");

// Create the DynamoDB service object
var ddb = new AWS.DynamoDB({ apiVersion: "2012-08-10" });

AWS.config.update({ region: process.env.AWS_REGION });

// We will configure our construct to 
// look for the .handler function
exports.handler = async function (event) {
  try {
    // Kinesis will deliver records 
    // in batches, so we need to iterate through
    // each record in the batch
    for (let record of event.Records) {
      const reading = parsePayload(record.kinesis.data);
      await writeRecord(record.kinesis.partitionKey, reading);
    };
  } catch (err) {
    console.log(`Write failed, err:\n${JSON.stringify(err, null, 2)}`);
    throw err;
  }
  return;
};

// Write the provided sensor reading data to the DynamoDB table
async function writeRecord(partitionKey, reading) {

  var params = {
    // Notice that Constructs automatically sets up 
    // an environment variable with the table name.
    TableName: process.env.DDB_TABLE_NAME,
    Item: {
      partitionKey: { S: partitionKey },  // sensor Id
      timestamp: { S: reading.timestamp },
      value: { N: reading.value}
    },
  };

  // Call DynamoDB to add the item to the table
  await ddb.putItem(params).promise();
}

// Decode the payload and extract the sensor data from it
function parsePayload(payload) {

  const decodedPayload = Buffer.from(payload, "base64").toString(
    "ascii"
  );

  // Our CLI command will send the records to Kinesis
  // with the values delimited by '|'
  const payloadValues = decodedPayload.split("|", 2)
  return {
    value: payloadValues[0],
    timestamp: payloadValues[1]
  }
}

We won’t spend a lot of time explaining this function – it’s pretty straightforward and heavily commented. It receives an event with one or more sensor readings, and for each reading it extracts the pertinent data and saves it to the DynamoDB table.

You will use two Solutions Constructs to create your infrastructure:

The aws-kinesisstreams-lambda construct deploys an Amazon Kinesis data stream and a Lambda function.

  • aws-kinesisstreams-lambda creates the Kinesis data stream and Lambda function that subscribes to that stream. To support this, it also creates other resources, such as IAM roles and encryption keys.

The aws-lambda-dynamodb construct deploys a Lambda function and a DynamoDB table.

  • aws-lambda-dynamodb creates an Amazon DynamoDB table and a Lambda function with permission to access the table.
  1. To deploy the first of these two constructs, replace the code in lib/stream-ingestion-stack.ts with the following code:
import * as cdk from "@aws-cdk/core";
import * as lambda from "@aws-cdk/aws-lambda";
import { KinesisStreamsToLambda } from "@aws-solutions-constructs/aws-kinesisstreams-lambda";

import * as ddb from "@aws-cdk/aws-dynamodb";
import { LambdaToDynamoDB } from "@aws-solutions-constructs/aws-lambda-dynamodb";

export class StreamIngestionStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const kinesisLambda = new KinesisStreamsToLambda(
      this,
      "KinesisLambdaConstruct",
      {
        lambdaFunctionProps: {
          // Where the CDK can find the lambda function code
          runtime: lambda.Runtime.NODEJS_10_X,
          handler: "lambdaFunction.handler",
          code: lambda.Code.fromAsset("lambda"),
        },
      }
    );

    // Next Solutions Construct goes here
  }
}

Let’s explore this code:

  • It instantiates a new KinesisStreamsToLambda object. This Solutions Construct will launch a new Kinesis data stream and a new Lambda function, setting up the Lambda function to receive all the messages in the Kinesis data stream. It will also deploy all the additional resources and policies required for the architecture to follow best practices.
  • The third argument to the constructor is the properties object, where you specify overrides of default values or any other information the construct needs. In this case you provide properties for the encapsulated Lambda function that informs the CDK where to find the code for the Lambda function that you stored as lambda/lambdaFunction.js earlier.
  1. Now you’ll add the second construct that connects the Lambda function to a new DynamoDB table. In the same lib/stream-ingestion-stack.ts file, replace the line // Next Solutions Construct goes here with the following code:
    // Define the primary key for the new DynamoDB table
    const primaryKeyAttribute: ddb.Attribute = {
      name: "partitionKey",
      type: ddb.AttributeType.STRING,
    };

    // Define the sort key for the new DynamoDB table
    const sortKeyAttribute: ddb.Attribute = {
      name: "timestamp",
      type: ddb.AttributeType.STRING,
    };

    const lambdaDynamoDB = new LambdaToDynamoDB(
      this,
      "LambdaDynamodbConstruct",
      {
        // Tell construct to use the Lambda function in
        // the first construct rather than deploy a new one
        existingLambdaObj: kinesisLambda.lambdaFunction,
        tablePermissions: "Write",
        dynamoTableProps: {
          partitionKey: primaryKeyAttribute,
          sortKey: sortKeyAttribute,
          billingMode: ddb.BillingMode.PROVISIONED,
          removalPolicy: cdk.RemovalPolicy.DESTROY
        },
      }
    );

    // Add autoscaling
    const readScaling = lambdaDynamoDB.dynamoTable.autoScaleReadCapacity({
      minCapacity: 1,
      maxCapacity: 50,
    });

    readScaling.scaleOnUtilization({
      targetUtilizationPercent: 50,
    });

Let’s explore this code:

  • The first two const objects define the names and types for the partition key and sort key of the DynamoDB table.
  • The LambdaToDynamoDB construct instantiated creates a new DynamoDB table and grants access to your Lambda function. The key to this call is the properties object you pass in the third argument.
    • The first property sent to LambdaToDynamoDB is existingLambdaObj – by setting this value to the Lambda function created by KinesisStreamsToLambda, you’re telling the construct to not create a new Lambda function, but to grant the Lambda function in the other Solutions Construct access to the DynamoDB table. This illustrates how you can chain many Solutions Constructs together to create complex architectures.
    • The second property sent to LambdaToDynamoDB tells the construct to limit the Lambda function’s access to the table to write only.
    • The third property sent to LambdaToDynamoDB is actually a full properties object defining the DynamoDB table. It provides the two attribute definitions you created earlier as well as the billing mode. It also sets the RemovalPolicy to DESTROY. This policy setting ensures that the table is deleted when you delete this stack – in most cases you should accept the default setting to protect your data.
  • The last two lines of code show how you can use statements to modify a construct outside the constructor. In this case we set up auto scaling on the new DynamoDB table, which we can access with the dynamoTable property on the construct we just instantiated.

That’s all it takes to create the all resources to deploy your architecture.

  1. Save all the files, then compile the Typescript into a CDK program using this command:

npm run build

  1. Finally, launch the stack using this command:

cdk deploy

(Enter “y” in response to Do you wish to deploy all these changes (y/n)?)

You will see some warnings where you override CDK default values. Because you are doing this intentionally you may disregard these, but it’s always a good idea to review these warnings when they occur.

Tip – Many mysterious CDK project errors stem from mismatched versions. If you get stuck on an inexplicable error, check package.json and confirm that all CDK and Solutions Constructs libraries have the same version number (with no leading caret ^). If necessary, correct the version numbers, delete the package-lock.json file and node_modules tree and run npm install. Think of this as the “turn it off and on again” first response to CDK errors.

You have now deployed the entire architecture for the demo – open the CloudFormation stack in the AWS Management Console and take a few minutes to explore all 12 resources that the program deployed (and the 380 line template generated to created them).

Feed the Stream

Now use the CLI to send some data through the stack.

Go to the Kinesis Data Streams console and copy the name of the data stream. Replace the stream name in the following command and run it from the command line.

aws kinesis put-records \
--stream-name StreamIngestionStack-KinesisLambdaConstructKinesisStreamXXXXXXXX-XXXXXXXXXXXX \
--records \
PartitionKey=1301,'Data=15.4|2020-08-22T01:16:36+00:00' \
PartitionKey=1503,'Data=39.1|2020-08-22T01:08:15+00:00'

Tip – If you are using the AWS CLI v2, the previous command will result in an “Invalid base64…” error because v2 expects the inputs to be Base64 encoded by default. Adding the argument --cli-binary-format raw-in-base64-out will fix the issue.

To confirm that the messages made it through the service, open the DynamoDB console – you should see the two records in the table.

Now that you’ve got it working, pause to think about what you just did. You deployed a system that can ingest and store sensor readings and scale to handle heavy loads. You did that by instantiating two objects – well under 60 lines of code. Experiment with changing some property values and deploying the changes by running npm run build and cdk deploy again.

Cleanup

To clean up the resources in the stack, run this command:

cdk destroy

Conclusion

Just as languages like BASIC and C allowed developers to write programs at a higher level of abstraction than assembly language, the AWS CDK and AWS Solutions Constructs allow us to create CloudFormation stacks in Typescript, Java, or Python instead JSON or YAML. Just as there will always be a place for assembly language, there will always be situations where we want to write CloudFormation templates manually – but for most situations, we can now use the AWS CDK and AWS Solutions Constructs to create complex and complete architectures in a fraction of the time with very little code.

AWS Solutions Constructs can currently be used in CDK applications written in Typescript, Javascript, Java and Python and will be available in C# applications soon.

About the Author

Biff Gaut has been shipping software since 1983, from small startups to large IT shops. Along the way he has contributed to 2 books, spoken at several conferences and written many blog posts. He is now a Principal Solutions Architect at AWS working on the AWS Solutions Constructs team, helping customers deploy better architectures more quickly.

Optimizing the cost of serverless web applications

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/optimizing-the-cost-of-serverless-web-applications/

Web application backends are one of the most frequent types of serverless use-case for customers. The pay-for-value model can make it cost-efficient to build web applications using serverless tools.

While serverless cost is generally correlated with level of usage, there are architectural decisions that impact cost efficiency. The impact of these choices is more significant as your traffic grows, so it’s important to consider the cost-effectiveness of different designs and patterns.

This blog post reviews some common areas in web applications where you may be able to optimize cost. It uses the Happy Path web application as a reference example, which you can read about in the introductory blog post.

Serverless web applications generally use a combination of the services in the following diagram. I cover each of these areas to highlight common areas for cost optimization.

Serverless architecture by AWS service

The API management layer: Selecting the right API type

Most serverless web applications use an API between the frontend client and the backend architecture. Amazon API Gateway is a common choice since it is a fully managed service that scales automatically. There are three types of API offered by the service – REST APIs, WebSocket APIs, and the more recent HTTP APIs.

HTTP APIs offer many of the features in the REST APIs service, but the cost is often around 70% less. It supports Lambda service integration, JWT authorization, CORS, and custom domain names. It also has a simpler deployment model than REST APIs. This feature set tends to work well for web applications, many of which mainly use these capabilities. Additionally, HTTP APIs will gain feature parity with REST APIs over time.

The Happy Path application is designed for 100,000 monthly active users. It uses HTTP APIs, and you can inspect the backend/template.yaml to see how to define these in the AWS Serverless Application Model (AWS SAM). If you have existing AWS SAM templates that are using REST APIs, in many cases you can change these easily:

REST to HTTP API

Content distribution layer: Optimizing assets

Amazon CloudFront is a content delivery network (CDN). It enables you to distribute content globally across 216 Points of Presence without deploying or managing any infrastructure. It reduces latency for users who are geographically dispersed and can also reduce load on other parts of your service.

A typical web application uses CDNs in a couple of different ways. First, there is the distribution of the application itself. For single-page application frameworks like React or Vue.js, the build processes create static assets that are ideal for serving over a CDN.

However, these builds may not be optimized and can be larger than necessary. Many frameworks offer optimization plugins, and the JavaScript community frequently uses Webpack to bundle modules and shrink deployment packages. Similarly, any media assets used in the application build should be optimized. You can use tools like Lighthouse to analyze your web apps to find images that can be resized or compressed.

Optimizing images

The second common CDN use-case for web apps is for user-generated content (UGC). Many apps allow users to upload images, which are then shared with other users. A typical photo from a 12-megapixel smartphone is 3–9 MB in size. This high resolution is not necessary when photos are rendered within web apps. Displaying the high-resolution asset results in slower download performance and higher data transfer costs.

The Happy Path application uses a Resizer Lambda function to optimize these uploaded assets. This process creates two different optimized images depending upon which component loads the asset.

Image sizes in front-end applications

The upload S3 bucket shows the original size of the upload from the smartphone:

The distribution S3 bucket contains the two optimized images at different sizes:

Optimized images in the distribution S3 bucket

The distribution file sizes are 98–99% smaller. For a busy web application, using optimized image assets can make a significant difference to data transfer and CloudFront costs.

Additionally, you can convert to highly optimized file formats such as WebP to reduce file size even further. Not all browsers support this format, but you can use CSS on the frontend to fall back to other types if needed:

<img src="myImage.webp" onerror="this.onerror=null; this.src='myImage.jpg'">

The data layer

AWS offers many different database and storage options that can be useful for web applications. Billing models vary by service and Region. By understanding the data access and storage requirements of your app, you can make informed decisions about the right service to use.

Generally, it’s more cost-effective to store binary data in S3 than a database. First, when the data is uploaded, you can upload directly to S3 with presigned URLs instead of proxying data via API Gateway or another service.

If you are using Amazon DynamoDB, it’s best practice to store larger items in S3 and include a reference token in a table item. Part of DynamoDB pricing is based on read capacity units (RCUs). For binary items such as images, it is usually more cost-efficient to use S3 for storage.

Many web developers who are new to serverless are familiar with using a relational database, so choose Amazon RDS for their database needs. Depending upon your use-case and data access patterns, it may be more cost effective to use DynamoDB instead. RDS is not a serverless service so there are monthly charges for the underlying compute instance. DynamoDB pricing is based upon usage and storage, so for many web apps may be a lower-cost choice.

Integration layer

This layer includes services like Amazon SQS, Amazon SNS, and Amazon EventBridge, which are essential for decoupling serverless applications. Each of these have a request-based pricing component, where 64 KB of a payload is billed as one request. For example, a single SQS message with a 256 KB payload is billed as four requests. There are two optimization methods common for web applications.

1. Combine messages

Many messages sent to these services are much smaller than 64 KB. In some applications, the publishing service can combine multiple messages to reduce the total number of publish actions to SNS. Additionally, by either eliminating unused attributes in the message or compressing the message, you can store more data in a single request.

For example, a publishing service may be able to combine multiple messages together in a single publish action to an SNS topic:

  • Before optimization, a publishing service sends 100,000,000 1KB-messages to an SNS topic. This is charged as 100 million messages for a total cost of $50.00.
  • After optimization, the publishing service combines messages to send 1,562,500 64KB-messages to an SNS topic. This is charged as 1,562,500 messages for a total cost of $0.78.

2. Filter messages

In many applications, not every message is useful for a consuming service. For example, an SNS topic may publish to a Lambda function, which checks the content and discards the message based on some criteria. In this case, it’s more cost effective to use the native filtering capabilities of SNS. The service can filter messages and only invoke the Lambda function if the criteria is met. This lowers the compute cost by only invoking Lambda when necessary.

For example, an SNS topic receives messages about customer orders and forwards these to a Lambda function subscriber. The function is only interested in canceled orders and discards all other messages:

  • Before optimization, the SNS topic sends all messages to a Lambda function. It evaluates the message for the presence of an order canceled attribute. On average, only 25% of the messages are processed further. While SNS does not charge for delivery to Lambda functions, you are charged each time the Lambda service is invoked, for 100% of the messages.
  • After optimization, using an SNS subscription filter policy, the SNS subscription filters for canceled orders and only forwards matching messages. Since the Lambda function is only invoked for 25% of the messages, this may reduce the total compute cost by up to 75%.

3. Choose a different messaging service

For complex filtering options based upon matching patterns, you can use EventBridge. The service can filter messages based upon prefix matching, numeric matching, and other patterns, combining several rules into a single filter. You can create branching logic within the EventBridge rule to invoke downstream targets.

EventBridge offers a broader range of targets than SNS destinations. In cases where you publish from an SNS topic to a Lambda function to invoke an EventBridge target, you could use EventBridge instead and eliminate the Lambda invocation. For example, instead of routing from SNS to Lambda to AWS Step Functions, instead create an EventBridge rule that routes events directly to a state machine.

Business logic layer

Step Functions allows you to orchestrate complex workflows in serverless applications while eliminating common boilerplate code. The Standard Workflow service charges per state transition. Express Workflows were introduced in December 2019, with pricing based on requests and duration, instead of transitions.

For workloads that are processing large numbers of events in shorter durations, Express Workflows can be more cost-effective. This is designed for high-volume event workloads, such as streaming data processing or IoT data ingestion. For these cases, compare the cost of the two workflow types to see if you can reduce cost by switching across.

Lambda is the on-demand compute layer in serverless applications, which is billed by requests and GB-seconds. GB-seconds is calculated by multiplying duration in seconds by memory allocated to the function. For a function with a 1-second duration, invoked 1 million times, here is how memory allocation affects the total cost in the US East (N. Virginia) Region:

Memory (MB) GB/S Compute cost Total cost
128 125,000 $ 2.08 $ 2.28
512 500,000 $ 8.34 $ 8.54
1024 1,000,000 $ 16.67 $ 16.87
1536 1,500,000 $ 25.01 $ 25.21
2048 2,000,000 $ 33.34 $ 33.54
3008 2,937,500 $ 48.97 $ 49.17

There are many ways to optimize Lambda functions, but one of the most important choices is memory allocation. You can choose between 128 MB and 3008 MB, but this also impacts the amount of virtual CPU as memory increases. Since total cost is a combination of memory and duration, choosing more memory can often reduce duration and lower overall cost.

Instead of manually setting the memory for a Lambda function and running executions to compare duration, you can use the AWS Lambda Power Tuning tool. This uses Step Functions to run your function against varying memory configurations. It can produce a visualization to find the optimal memory setting, based upon cost or execution time.

Optimizing costs with the AWS Lambda Power Tuning tool

Conclusion

Web application backends are one of the most popular workload types for serverless applications. The pay-per-value model works well for this type of workload. As traffic grows, it’s important to consider the design choices and service configurations used to optimize your cost.

Serverless web applications generally use a common range of services, which you can logically split into different layers. This post examines each layer and suggests common cost optimizations helpful for web app developers.

To learn more about building web apps with serverless, see the Happy Path series. For more serverless learning resources, visit https://serverlessland.com.

Unified serverless streaming ETL architecture with Amazon Kinesis Data Analytics

Post Syndicated from Ram Vittal original https://aws.amazon.com/blogs/big-data/unified-serverless-streaming-etl-architecture-with-amazon-kinesis-data-analytics/

Businesses across the world are seeing a massive influx of data at an enormous pace through multiple channels. With the advent of cloud computing, many companies are realizing the benefits of getting their data into the cloud to gain meaningful insights and save costs on data processing and storage. As businesses embark on their journey towards cloud solutions, they often come across challenges involving building serverless, streaming, real-time ETL (extract, transform, load) architecture that enables them to extract events from multiple streaming sources, correlate those streaming events, perform enrichments, run streaming analytics, and build data lakes from streaming events.

In this post, we discuss the concept of unified streaming ETL architecture using a generic serverless streaming architecture with Amazon Kinesis Data Analytics at the heart of the architecture for event correlation and enrichments. This solution can address a variety of streaming use cases with various input sources and output destinations. We then walk through a specific implementation of the generic serverless unified streaming architecture that you can deploy into your own AWS account for experimenting and evolving this architecture to address your business challenges.

Overview of solution

As data sources grow in volume, variety, and velocity, the management of data and event correlation become more challenging. Most of the challenges stem from data silos, in which different teams and applications manage data and events using their own tools and processes.

Modern businesses need a single, unified view of the data environment to get meaningful insights through streaming multi-joins, such as the correlation of sensory events and time-series data. Event correlation plays a vital role in automatically reducing noise and allowing the team to focus on those issues that really matter to the business objectives.

To realize this outcome, the solution proposes creating a three-stage architecture:

  • Ingestion
  • Processing
  • Analysis and visualization

The source can be a varied set of inputs comprising structured datasets like databases or raw data feeds like sensor data that can be ingested as single or multiple parallel streams. The solution envisions multiple hybrid data sources as well. After it’s ingested, the data is divided into single or multiple data streams depending on the use case and passed through a preprocessor (via an AWS Lambda function). This highly customizable processor transforms and cleanses data to be processed through analytics application. Furthermore, the architecture allows you to enrich data or validate it against standard sets of reference data, for example validating against postal codes for address data received from the source to verify its accuracy. After the data is processed, it’s sent to various sink platforms depending on your preferences, which could range from storage solutions to visualization solutions, or even stored as a dataset in a high-performance database.

The solution is designed with flexibility as a key tenant to address multiple, real-world use cases. The following diagram illustrates the solution architecture.

The architecture has the following workflow:

  1. We use AWS Database Migration Service (AWS DMS) to push records from the data source into AWS in real time or batch. For our use case, we use AWS DMS to fetch records from an on-premises relational database.
  2. AWS DMS writes records to Amazon Kinesis Data Streams. The data is split into multiple streams as necessitated through the channels.
  3. A Lambda function picks up the data stream records and preprocesses them (adding the record type). This is an optional step, depending on your use case.
  4. Processed records are sent to the Kinesis Data Analytics application for querying and correlating in-application streams, taking into account Amazon Simple Storage Service (Amazon S3) reference data for enrichment.

Solution walkthrough

For this post, we demonstrate an implementation of the unified streaming ETL architecture using Amazon RDS for MySQL as the data source and Amazon DynamoDB as the target. We use a simple order service data model that comprises orders, items, and products, where an order can have multiple items and the product is linked to an item in a reference relationship that provides detail about the item, such as description and price.

We implement a streaming serverless data pipeline that ingests orders and items as they are recorded in the source system into Kinesis Data Streams via AWS DMS. We build a Kinesis Data Analytics application that correlates orders and items along with reference product information and creates a unified and enriched record. Kinesis Data Analytics outputs output this unified and enriched data to Kinesis Data Streams. A Lambda function consumer processes the data stream and writes the unified and enriched data to DynamoDB.

To launch this solution in your AWS account, use the GitHub repo.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Setting up AWS resources in your account

To set up your resources for this walkthrough, complete the following steps:

  1. Set up the AWS CDK for Java on your local workstation. For instructions, see Getting Started with the AWS CDK.
  2. Install Maven binaries for Java if you don’t have Maven installed already.
  3. If this is the first installation of the AWS CDK, make sure to run cdk bootstrap.
  4. Clone the following GitHub repo.
  5. Navigate to the project root folder and run the following commands to build and deploy:
    1. mvn compile
    2. cdk deploy UnifiedStreamETLCommonStack UnifiedStreamETLDataStack UnifiedStreamETLProcessStack

Setting up the orders data model for CDC

In this next step, you set up the orders data model for change data capture (CDC).

  1. On the Amazon Relational Database Service (Amazon RDS) console, choose Databases.
  2. Choose your database and make sure that you can connect to it securely for testing using bastion host or other mechanisms (not detailed in scope of this post).
  3. Start MySQL Workbench and connect to your database using your DB endpoint and credentials.
  4. To create the data model in your Amazon RDS for MySQL database, run orderdb-setup.sql.
  5. On the AWS DMS console, test the connections to your source and target endpoints.
  6. Choose Database migration tasks.
  7. Choose your AWS DMS task and choose Table statistics.
  8. To update your table statistics, restart the migration task (with full load) for replication.
  9. From your MySQL Workbench session, run orders-data-setup.sql to create orders and items.
  10. Verify that CDC is working by checking the Table statistics

Setting up your Kinesis Data Analytics application

To set up your Kinesis Data Analytics application, complete the following steps:

  1. Upload the product reference products.json to your S3 bucket with the logical ID prefix unifiedBucketId (which was previously created by cdk deploy).

You can now create a Kinesis Data Analytics application and map the resources to the data fields.

  1. On the Amazon Kinesis console, choose Analytics Application.
  2. Choose Create application.
  3. For Runtime, choose SQL.
  4. Connect the streaming data created using the AWS CDK as a unified order stream.
  5. Choose Discover schema and wait for it to discover the schema for the unified order stream. If discovery fails, update the records on the source Amazon RDS tables and send streaming CDC records.
  6. Save and move to the next step.
  7. Connect the reference S3 bucket you created with the AWS CDK and uploaded with the reference data.
  8. Input the following:
    1. “products.json” on the path to the S3 object
    2. Products on the in-application reference table name
  9. Discover the schema, then save and close.
  10. Choose SQL Editor and start the Kinesis Data Analytics application.
  11. Edit the schema for SOURCE_SQL_STREAM_001 and map the data resources as follows:
Column Name Column Type Row Path
orderId INTEGER $.data.orderId
itemId INTEGER $.data.orderId
itemQuantity INTEGER $.data.itemQuantity
itemAmount REAL $.data.itemAmount
itemStatus VARCHAR $.data.itemStatus
COL_timestamp VARCHAR $.metadata.timestamp
recordType VARCHAR $.metadata.table-name
operation VARCHAR $.metadata.operation
partitionkeytype VARCHAR $.metadata.partition-key-type
schemaname VARCHAR $.metadata.schema-name
tablename VARCHAR $.metadata.table-name
transactionid BIGINT $.metadata.transaction-id
orderAmount DOUBLE $.data.orderAmount
orderStatus VARCHAR $.data.orderStatus
orderDateTime TIMESTAMP $.data.orderDateTime
shipToName VARCHAR $.data.shipToName
shipToAddress VARCHAR $.data.shipToAddress
shipToCity VARCHAR $.data.shipToCity
shipToState VARCHAR $.data.shipToState
shipToZip VARCHAR $.data.shipToZip

 

  1. Choose Save schema and update stream samples.

When it’s complete, verify for 1 minute that nothing is in the error stream. If an error occurs, check that you defined the schema correctly.

  1. On your Kinesis Data Analytics application, choose your application and choose Real-time analytics.
  2. Go to the SQL results and run kda-orders-setup.sql to create in-application streams.
  3. From the application, choose Connect to destination.
  4. For Kinesis data stream, choose unifiedOrderEnrichedStream.
  5. For In-application stream, choose ORDER_ITEM_ENRICHED_STREAM.
  6. Choose Save and Continue.

Testing the unified streaming ETL architecture

You’re now ready to test your architecture.

  1. Navigate to your Kinesis Data Analytics application.
  2. Choose your app and choose Real-time analytics.
  3. Go to the SQL results and choose Real-time analytics.
  4. Choose the in-application stream ORDER_ITEM_ENRCIHED_STREAM to see the results of the real-time join of records from the order and order item streaming Kinesis events.
  5. On the Lambda console, search for UnifiedStreamETLProcess.
  6. Choose the function and choose Monitoring, Recent invocations.
  7. Verify the Lambda function run results.
  8. On the DynamoDB console, choose the OrderEnriched table.
  9. Verify the unified and enriched records that combine order, item, and product records.

The following screenshot shows the OrderEnriched table.

Operational aspects

When you’re ready to operationalize this architecture for your workloads, you need to consider several aspects:

  • Monitoring metrics for Kinesis Data Streams: GetRecords.IteratorAgeMilliseconds, ReadProvisionedThroughputExceeded, and WriteProvisionedThroughputExceeded
  • Monitoring metrics available for the Lambda function, including but not limited to Duration, IteratorAge, Error count and success rate (%), Concurrent executions, and Throttles
  • Monitoring metrics for Kinesis Data Analytics (millisBehindLatest)
  • Monitoring DynamoDB provisioned read and write capacity units
  • Using the DynamoDB automatic scaling feature to automatically manage throughput

We used the solution architecture with the following configuration settings to evaluate the operational performance:

  • Kinesis OrdersStream with two shards and Kinesis OrdersEnrichedStream with two shards
  • The Lambda function code does asynchronous processing with Kinesis OrdersEnrichedStream records in concurrent batches of five, with batch size as 500
  • DynamoDB provisioned WCU is 3000, RCU is 300

We observed the following results:

  • 100,000 order items are enriched with order event data and product reference data and persisted to DynamoDB
  • An average of 900 milliseconds latency from the time of event ingestion to the Kinesis pipeline to when the record landed in DynamoDB

The following screenshot shows the visualizations of these metrics.

Cleaning up

To avoid incurring future charges, delete the resources you created as part of this post (the AWS CDK provisioned AWS CloudFormation stacks).

Conclusion

In this post, we designed a unified streaming architecture that extracts events from multiple streaming sources, correlates and performs enrichments on events, and persists those events to destinations. We then reviewed a use case and walked through the code for ingesting, correlating, and consuming real-time streaming data with Amazon Kinesis, using Amazon RDS for MySQL as the source and DynamoDB as the target.

Managing an ETL pipeline through Kinesis Data Analytics provides a cost-effective unified solution to real-time and batch database migrations using common technical knowledge skills like SQL querying.


About the Authors

Ram Vittal is an enterprise solutions architect at AWS. His current focus is to help enterprise customers with their cloud adoption and optimization journey to improve their business outcomes. In his spare time, he enjoys tennis, photography, and movies.

 

 

 

 

Akash Bhatia is a Sr. solutions architect at AWS. His current focus is helping customers achieve their business outcomes through architecting and implementing innovative and resilient solutions at scale.

 

 

How to delete user data in an AWS data lake

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/how-to-delete-user-data-in-an-aws-data-lake/

General Data Protection Regulation (GDPR) is an important aspect of today’s technology world, and processing data in compliance with GDPR is a necessity for those who implement solutions within the AWS public cloud. One article of GDPR is the “right to erasure” or “right to be forgotten” which may require you to implement a solution to delete specific users’ personal data.

In the context of the AWS big data and analytics ecosystem, every architecture, regardless of the problem it targets, uses Amazon Simple Storage Service (Amazon S3) as the core storage service. Despite its versatility and feature completeness, Amazon S3 doesn’t come with an out-of-the-box way to map a user identifier to S3 keys of objects that contain user’s data.

This post walks you through a framework that helps you purge individual user data within your organization’s AWS hosted data lake, and an analytics solution that uses different AWS storage layers, along with sample code targeting Amazon S3.

Reference architecture

To address the challenge of implementing a data purge framework, we reduced the problem to the straightforward use case of deleting a user’s data from a platform that uses AWS for its data pipeline. The following diagram illustrates this use case.

We’re introducing the idea of building and maintaining an index metastore that keeps track of the location of each user’s records and allows us locate to them efficiently, reducing the search space.

You can use the following architecture diagram to delete a specific user’s data within your organization’s AWS data lake.

For this initial version, we created three user flows that map each task to a fitting AWS service:

Flow 1: Real-time metastore update

The S3 ObjectCreated or ObjectDelete events trigger an AWS Lambda function that parses the object and performs an add/update/delete operation to keep the metadata index up to date. You can implement a simple workflow for any other storage layer, such as Amazon Relational Database Service (RDS), Amazon Aurora, or Amazon Elasticsearch Service (ES). We use Amazon DynamoDB and Amazon RDS for PostgreSQL as the index metadata storage options, but our approach is flexible to any other technology.

Flow 2: Purge data

When a user asks for their data to be deleted, we trigger an AWS Step Functions state machine through Amazon CloudWatch to orchestrate the workflow. Its first step triggers a Lambda function that queries the metadata index to identify the storage layers that contain user records and generates a report that’s saved to an S3 report bucket. A Step Functions activity is created and picked up by a Lambda Node JS based worker that sends an email to the approver through Amazon Simple Email Service (SES) with approve and reject links.

The following diagram shows a graphical representation of the Step Function state machine as seen on the AWS Management Console.

The approver selects one of the two links, which then calls an Amazon API Gateway endpoint that invokes Step Functions to resume the workflow. If you choose the approve link, Step Functions triggers a Lambda function that takes the report stored in the bucket as input, deletes the objects or records from the storage layer, and updates the index metastore. When the purging job is complete, Amazon Simple Notification Service (SNS) sends a success or fail email to the user.

The following diagram represents the Step Functions flow on the console if the purge flow completed successfully.

For the complete code base, see step-function-definition.json in the GitHub repo.

Flow 3: Batch metastore update

This flow refers to the use case of an existing data lake for which index metastore needs to be created. You can orchestrate the flow through AWS Step Functions, which takes historical data as input and updates metastore through a batch job. Our current implementation doesn’t include a sample script for this user flow.

Our framework

We now walk you through the two use cases we followed for our implementation:

  • You have multiple user records stored in each Amazon S3 file
  • A user has records stored in homogenous AWS storage layers

Within these two approaches, we demonstrate alternatives that you can use to store your index metastore.

Indexing by S3 URI and row number

For this use case, we use a free tier RDS Postgres instance to store our index. We created a simple table with the following code:

CREATE UNLOGGED TABLE IF NOT EXISTS user_objects (
				userid TEXT,
				s3path TEXT,
				recordline INTEGER
			);

You can index on user_id to optimize query performance. On object upload, for each row, you need to insert into the user_objects table a row that indicates the user ID, the URI of the target Amazon S3 object, and the row that corresponds to the record. For instance, when uploading the following JSON input, enter the following code:

{"user_id":"V34qejxNsCbcgD8C0HVk-Q","body":"…"}
{"user_id":"ofKDkJKXSKZXu5xJNGiiBQ","body":"…"}
{"user_id":"UgMW8bLE0QMJDCkQ1Ax5Mg","body ":"…"}

We insert the tuples into user_objects in the Amazon S3 location s3://gdpr-demo/year=2018/month=2/day=26/input.json. See the following code:

(“V34qejxNsCbcgD8C0HVk-Q”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 0)
(“ofKDkJKXSKZXu5xJNGiiBQ”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 1)
(“UgMW8bLE0QMJDCkQ1Ax5Mg”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 2)

You can implement the index update operation by using a Lambda function triggered on any Amazon S3 ObjectCreated event.

When we get a delete request from a user, we need to query our index to get some information about where we have stored the data to delete. See the following code:

SELECT s3path,
                ARRAY_AGG(recordline)
                FROM user_objects
                WHERE userid = ‘V34qejxNsCbcgD8C0HVk-Q’
                GROUP BY;

The preceding example SQL query returns rows like the following:

(“s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json“, {2102,529})

The output indicates that lines 529 and 2102 of S3 object s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json contain the requested user’s data and need to be purged. We then need to download the object, remove those rows, and overwrite the object. For a Python implementation of the Lambda function that implements this functionality, see deleteUserRecords.py in the GitHub repo.

Having the record line available allows you to perform the deletion efficiently in byte format. For implementation simplicity, we purge the rows by replacing the deleted rows with an empty JSON object. You pay a slight storage overhead, but you don’t need to update subsequent row metadata in your index, which would be costly. To eliminate empty JSON objects, we can implement an offline vacuum and index update process.

Indexing by file name and grouping by index key

For this use case, we created a DynamoDB table to store our index. We chose DynamoDB because of its ease of use and scalability; you can use its on-demand pricing model so you don’t need to guess how many capacity units you might need. When files are uploaded to the data lake, a Lambda function parses the file name (for example, 1001-.csv) to identify the user identifier and populates the DynamoDB metadata table. Userid is the partition key, and each different storage layer has its own attribute. For example, if user 1001 had data in Amazon S3 and Amazon RDS, their records look like the following code:

{"userid:": 1001, "s3":{"s3://path1", "s3://path2"}, "RDS":{"db1.table1.column1"}}

For a sample Python implementation of this functionality, see update-dynamo-metadata.py in the GitHub repo.

On delete request, we query the metastore table, which is DynamoDB, and generate a purge report that contains details on what storage layers contain user records, and storage layer specifics that can speed up locating the records. We store the purge report to Amazon S3. For a sample Lambda function that implements this logic, see generate-purge-report.py in the GitHub repo.

After the purging is approved, we use the report as input to delete the required resources. For a sample Lambda function implementation, see gdpr-purge-data.py in the GitHub repo.

Implementation and technology alternatives

We explored and evaluated multiple implementation options, all of which present tradeoffs, such as implementation simplicity, efficiency, critical data compliance, and feature completeness:

  • Scan every record of the data file to create an index – Whenever a file is uploaded, we iterate through its records and generate tuples (userid, s3Uri, row_number) that are then inserted to our metadata storing layer. On delete request, we fetch the metadata records for requested user IDs, download the corresponding S3 objects, perform the delete in place, and re-upload the updated objects, overwriting the existing object. This is the most flexible approach because it supports a single object to store multiple users’ data, which is a very common practice. The flexibility comes at a cost because it requires downloading and re-uploading the object, which introduces a network bottleneck in delete operations. User activity datasets such as customer product reviews are a good fit for this approach, because it’s unexpected to have multiple records for the same user within each partition (such as a date partition), and it’s preferable to combine multiple users’ activity in a single file. It’s similar to what was described in the section “Indexing by S3 URI and row number” and sample code is available in the GitHub repo.
  • Store metadata as file name prefix – Adding the user ID as the prefix of the uploaded object under the different partitions that are defined based on query pattern enables you to reduce the required search operations on delete request. The metadata handling utility finds the user ID from the file name and maintains the index accordingly. This approach is efficient in locating the resources to purge but assumes a single user per object, and requires you to store user IDs within the filename, which might require InfoSec considerations. Clickstream data, where you would expect to have multiple click events for a single customer on a single date partition during a session, is a good fit. We covered this approach in the section “Indexing by file name and grouping by index key” and you can download the codebase from the GitHub repo.
  • Use a metadata file – Along with uploading a new object, we also upload a metadata file that’s picked up by an indexing utility to create and maintain the index up to date. On delete request, we query the index, which points us to the records to purge. A good fit for this approach is a use case that already involves uploading a metadata file whenever a new object is uploaded, such as uploading multimedia data, along with their metadata. Otherwise, uploading a metadata file on every object upload might introduce too much of an overhead.
  • Use the tagging feature of AWS services – Whenever a new file is uploaded to Amazon S3, we use the Put Object Tagging Amazon S3 operation to add a key-value pair for the user identifier. Whenever there is a user data delete request, it fetches objects with that tag and deletes them. This option is straightforward to implement using the existing Amazon S3 API and can therefore be a very initial version of your implementation. However, it involves significant limitations. It assumes a 1:1 cardinality between Amazon S3 objects and users (each object only contains data for a single user), searching objects based on a tag is limited and inefficient, and storing user identifiers as tags might not be compliant with your organization’s InfoSec policy.
  • Use Apache Hudi – Apache Hudi is becoming a very popular option to perform record-level data deletion on Amazon S3. Its current version is restricted to Amazon EMR, and you can use it if you start to build your data lake from scratch, because you need to store your as Hudi datasets. Hudi is a very active project and additional features and integrations with more AWS services are expected.

The key implementation decision of our approach is separating the storage layer we use for our data and the one we use for our metadata. As a result, our design is versatile and can be plugged in any existing data pipeline. Similar to deciding what storage layer to use for your data, there are many factors to consider when deciding how to store your index:

  • Concurrency of requests – If you don’t expect too many simultaneous inserts, even something as simple as Amazon S3 could be a starting point for your index. However, if you get multiple concurrent writes for multiple users, you need to look into a service that copes better with transactions.
  • Existing team knowledge and infrastructure – In this post, we demonstrated using DynamoDB and RDS Postgres for storing and querying the metadata index. If your team has no experience with either of those but are comfortable with Amazon ES, Amazon DocumentDB (with MongoDB compatibility), or any other storage layer, use those. Furthermore, if you’re already running (and paying for) a MySQL database that’s not used to capacity, you could use that for your index for no additional cost.
  • Size of index – The volume of your metadata is orders of magnitude lower than your actual data. However, if your dataset grows significantly, you might need to consider going for a scalable, distributed storage solution rather than, for instance, a relational database management system.

Conclusion

GDPR has transformed best practices and introduced several extra technical challenges in designing and implementing a data lake. The reference architecture and scripts in this post may help you delete data in a manner that’s compliant with GDPR.

Let us know your feedback in the comments and how you implemented this solution in your organization, so that others can learn from it.

 


About the Authors

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

 

 

 

 

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.

Building a serverless document scanner using Amazon Textract and AWS Amplify

Post Syndicated from Moheeb Zara original https://aws.amazon.com/blogs/compute/building-a-serverless-document-scanner-using-amazon-textract-and-aws-amplify/

This guide demonstrates creating and deploying a production ready document scanning application. It allows users to manage projects, upload images, and generate a PDF from detected text. The sample can be used as a template for building expense tracking applications, handling forms and legal documents, or for digitizing books and notes.

The frontend application is written in Vue.js and uses the Amplify Framework. The backend is built using AWS serverless technologies and consists of an Amazon API Gateway REST API that invokes AWS Lambda functions. Amazon Textract is used to analyze text from uploaded images to an Amazon S3 bucket. Detected text is stored in Amazon DynamoDB.

An architectural diagram of the application.

An architectural diagram of the application.

Prerequisites

You need the following to complete the project:

Deploy the application

The solution consists of two parts, the frontend application and the serverless backend. The Amplify CLI deploys all the Amazon Cognito authentication, and hosting resources for the frontend. The backend requires the Amazon Cognito user pool identifier to configure an authorizer on the API. This enables an authorization workflow, as shown in the following image.

A diagram showing how an Amazon Cognito authorization workflow works

A diagram showing how an Amazon Cognito authorization workflow works

First, configure the frontend. Complete the following steps using a terminal running on a computer or by using the AWS Cloud9 IDE. If using AWS Cloud9, create an instance using the default options.

From the terminal:

  1. Install the Amplify CLI by running this command.
    npm install -g @aws-amplify/cli
  2. Configure the Amplify CLI using this command. Follow the guided process to completion.
    amplify configure
  3. Clone the project from GitHub.
    git clone https://github.com/aws-samples/aws-serverless-document-scanner.git
  4. Navigate to the amplify-frontend directory and initialize the project using the Amplify CLI command. Follow the guided process to completion.
    cd aws-serverless-document-scanner/amplify-frontend
    
    amplify init
  5. Deploy all the frontend resources to the AWS Cloud using the Amplify CLI command.
    amplify push
  6. After the resources have finishing deploying, make note of the StackName and UserPoolId properties in the amplify-frontend/amplify/backend/amplify-meta.json file. These are required when deploying the serverless backend.

Next, deploy the serverless backend. While it can be deployed using the AWS SAM CLI, you can also deploy from the AWS Management Console:

  1. Navigate to the document-scanner application in the AWS Serverless Application Repository.
  2. In Application settings, name the application and provide the StackName and UserPoolId from the frontend application for the UserPoolID and AmplifyStackName parameters. Provide a unique name for the BucketName parameter.
  3. Choose Deploy.
  4. Once complete, copy the API endpoint so that it can be configured on the frontend application in the next section.

Configure and run the frontend application

  1. Create a file, amplify-frontend/src/api-config.js, in the frontend application with the following content. Include the API endpoint and the unique BucketName from the previous step. The s3_region value must be the same as the Region where your serverless backend is deployed.
    const apiConfig = {
    	"endpoint": "<API ENDPOINT>",
    	"s3_bucket_name": "<BucketName>",
    	"s3_region": "<Bucket Region>"
    };
    
    export default apiConfig;
  2. In a terminal, navigate to the root directory of the frontend application and run it locally for testing.
    cd aws-serverless-document-scanner/amplify-frontend
    
    npm install
    
    npm run serve

    You should see an output like this:

  3. To publish the frontend application to cloud hosting, run the following command.
    amplify publish

    Once complete, a URL to the hosted application is provided.

Using the frontend application

Once the application is running locally or hosted in the cloud, navigating to it presents a user login interface with an option to register. The registration flow requires a code sent to the provided email for verification. Once verified you’re presented with the main application interface.

Once you create a project and choose it from the list, you are presented with an interface for uploading images by page number.

On mobile, it uses the device camera to capture images. On desktop, images are provided by the file system. You can replace an image and the page selector also lets you go back and change an image. The corresponding analyzed text is updated in DynamoDB as well.

Each time you upload an image, the page is incremented. Choosing “Generate PDF” calls the endpoint for the GeneratePDF Lambda function and returns a PDF in base64 format. The download begins automatically.

You can also open the PDF in another window, if viewing a preview in a desktop browser.

Understanding the serverless backend

An architecture diagram of the serverless backend.

An architecture diagram of the serverless backend.

In the GitHub project, the folder serverless-backend/ contains the AWS SAM template file and the Lambda functions. It creates an API Gateway endpoint, six Lambda functions, an S3 bucket, and two DynamoDB tables. The template also defines an Amazon Cognito authorizer for the API using the UserPoolID passed in as a parameter:

Parameters:
  UserPoolID:
    Type: String
    Description: (Required) The user pool ID created by the Amplify frontend.

  AmplifyStackName:
    Type: String
    Description: (Required) The stack name of the Amplify backend deployment. 

  BucketName:
    Type: String
    Default: "ds-userfilebucket"
    Description: (Required) A unique name for the user file bucket. Must be all lowercase.  


Globals:
  Api:
    Cors:
      AllowMethods: "'*'"
      AllowHeaders: "'*'"
      AllowOrigin: "'*'"

Resources:

  DocumentScannerAPI:
    Type: AWS::Serverless::Api
    Properties:
      StageName: Prod
      Auth:
        DefaultAuthorizer: CognitoAuthorizer
        Authorizers:
          CognitoAuthorizer:
            UserPoolArn: !Sub 'arn:aws:cognito-idp:${AWS::Region}:${AWS::AccountId}:userpool/${UserPoolID}'
            Identity:
              Header: Authorization
        AddDefaultAuthorizerToCorsPreflight: False

This only allows authenticated users of the frontend application to make requests with a JWT token containing their user name and email. The backend uses that information to fetch and store data in DynamoDB that corresponds to the user making the request.

Two DynamoDB tables are created. A Project table, which tracks all the project names by user, and a Pages table, which tracks pages by project and user. The DynamoDB tables are created by the AWS SAM template with the partition key and range key defined for each table. These are used by the Lambda functions to query and sort items. See the documentation to learn more about DynamoDB table key schema.

ProjectsTable:
    Type: AWS::DynamoDB::Table
    Properties: 
      AttributeDefinitions: 
        - 
          AttributeName: "username"
          AttributeType: "S"
        - 
          AttributeName: "project_name"
          AttributeType: "S"
      KeySchema: 
        - AttributeName: username
          KeyType: HASH
        - AttributeName: project_name
          KeyType: RANGE
      ProvisionedThroughput: 
        ReadCapacityUnits: "5"
        WriteCapacityUnits: "5"

  PagesTable:
    Type: AWS::DynamoDB::Table
    Properties: 
      AttributeDefinitions: 
        - 
          AttributeName: "project"
          AttributeType: "S"
        - 
          AttributeName: "page"
          AttributeType: "N"
      KeySchema: 
        - AttributeName: project
          KeyType: HASH
        - AttributeName: page
          KeyType: RANGE
      ProvisionedThroughput: 
        ReadCapacityUnits: "5"
        WriteCapacityUnits: "5"

When an API Gateway endpoint is called, it passes the user credentials in the request context to a Lambda function. This is used by the CreateProject Lambda function, which also receives a project name in the request body, to create an item in the Project Table and associate it with a user.

The endpoint for the FetchProjects Lambda function is called to retrieve the list of projects associated with a user. The DeleteProject Lambda function removes a specific project from the Project table and any associated pages in the Pages table. It also deletes the folder in the S3 bucket containing all images for the project.

When a user enters a Project, the API endpoint calls the FetchPageCount Lambda function. This returns the number of pages for a project to update the current page number in the upload selector. The project is retrieved from the path parameters, as defined in the AWS SAM template:

FetchPageCount:
    Type: AWS::Serverless::Function
    Properties:
      Handler: app.handler
      Runtime: python3.8
      CodeUri: lambda_functions/fetchPageCount/
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref PagesTable
      Environment:
        Variables:
          PAGES_TABLE_NAME: !Ref PagesTable
      Events:
        GetResource:
          Type: Api
          Properties:
            RestApiId: !Ref DocumentScannerAPI
            Path: /pages/count/{project+}
            Method: get  

The template creates an S3 bucket and two AWS IAM managed policies. The policies are applied to the AuthRole and UnauthRole created by Amplify. This allows users to upload images directly to the S3 bucket. To understand how Amplify works with Storage, see the documentation.

The template also sets an S3 event notification on the bucket for all object create events with a “.png” suffix. Whenever the frontend uploads an image to S3, the object create event invokes the ProcessDocument Lambda function.

The function parses the object key to get the project name, user, and page number. Amazon Textract then analyzes the text of the image. The object returned by Amazon Textract contains the detected text and detailed information, such as the positioning of text in the image. Only the raw lines of text are stored in the Pages table.

import os
import json, decimal
import boto3
import urllib.parse
from boto3.dynamodb.conditions import Key, Attr

client = boto3.resource('dynamodb')
textract = boto3.client('textract')

tableName = os.environ.get('PAGES_TABLE_NAME')

def handler(event, context):

  table = client.Table(tableName)

  print(table.table_status)
 
  key = urllib.parse.unquote(event['Records'][0]['s3']['object']['key'])
  bucket = event['Records'][0]['s3']['bucket']['name']
  project = key.split('/')[3]
  page = key.split('/')[4].split('.')[0]
  user = key.split('/')[2]
  
  response = textract.detect_document_text(
    Document={
        'S3Object': {
            'Bucket': bucket,
            'Name': key
        }
    })
    
  fullText = ""
  
  for item in response["Blocks"]:
    if item["BlockType"] == "LINE":
        fullText = fullText + item["Text"] + '\n'
  
  print(fullText)

  table.put_item(Item= {
    'project': user + '/' + project,
    'page': int(page), 
    'text': fullText
    })

  # print(response)
  return

The GeneratePDF Lambda function retrieves the detected text for each page in a project from the Pages table. It combines the text into a PDF and returns it as a base64-encoded string for download. This function can be modified if your document structure differs.

Understanding the frontend

In the GitHub repo, the folder amplify-frontend/src/ contains all the code for the frontend application. In main.js, the Amplify VueJS modules are configured to use the resources defined in aws-exports.js. It also configures the endpoint and S3 bucket of the serverless backend, defined in api-config.js.

In components/DocumentScanner.vue, the API module is imported and the API is defined.

API calls are defined as Vue methods that can be called by various other components and elements of the application.

In components/Project.vue, the frontend uses the Storage module for Amplify to upload images. For more information on how to use S3 in an Amplify project see the documentation.

Conclusion

This blog post shows how to create a multiuser application that can analyze text from images and generate PDF documents. This guide demonstrates how to do so in a secure and scalable way using a serverless approach. The example also shows an event driven pattern for handling high volume image processing using S3, Lambda, and Amazon Textract.

The Amplify Framework simplifies the process of implementing authentication, storage, and backend integration. Explore the full solution on GitHub to modify it for your next project or startup idea.

To learn more about AWS serverless and keep up to date on the latest features, subscribe to the YouTube channel.

#ServerlessForEveryone

Jump-starting your serverless development environment

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/jump-starting-your-serverless-development-environment/

Developers building serverless applications often wonder how they can jump-start their local development environment. This blog post provides a broad guide for those developers wanting to set up a development environment for building serverless applications.

serverless development environment

AWS and open source tools for a serverless development environment .

To use AWS Lambda and other AWS services, create and activate an AWS account.

Command line tooling

Command line tools are scripts, programs, and libraries that enable rapid application development and interactions from within a command line shell.

The AWS CLI

The AWS Command Line Interface (AWS CLI) is an open source tool that enables developers to interact with AWS services using a command line shell. In many cases, the AWS CLI increases developer velocity for building cloud resources and enables automating repetitive tasks. It is an important piece of any serverless developer’s toolkit. Follow these instructions to install and configure the AWS CLI on your operating system.

AWS enables you to build infrastructure with code. This provides a single source of truth for AWS resources. It enables development teams to use version control and create deployment pipelines for their cloud infrastructure. AWS CloudFormation provides a common language to model and provision these application resources in your cloud environment.

AWS Serverless Application Model (AWS SAM CLI)

AWS Serverless Application Model (AWS SAM) is an extension for CloudFormation that further simplifies the process of building serverless application resources.

It provides shorthand syntax to define Lambda functions, APIs, databases, and event source mappings. During deployment, the AWS SAM syntax is transformed into AWS CloudFormation syntax, enabling you to build serverless applications faster.

The AWS SAM CLI is an open source command line tool used to locally build, test, debug, and deploy serverless applications defined with AWS SAM templates.

Install AWS SAM CLI on your operating system.

Test the installation by initializing a new quick start project with the following command:

$ sam init
  1. Choose 1 for the “Quick Start Templates
  2. Choose 1 for the “Node.js runtime
  3. Use the default name.

The generated /sam-app/template.yaml contains all the resource definitions for your serverless application. This includes a Lambda function with a REST API endpoint, along with the necessary IAM permissions.

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: hello-world/
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
      Events:
        HelloWorld:
          Type: Api # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
          Properties:
            Path: /hello
            Method: get

Deploy this application using the AWS SAM CLI guided deploy:

$ sam deploy -g

Local testing with AWS SAM CLI

The AWS SAM CLI requires Docker containers to simulate the AWS Lambda runtime environment on your local development environment. To test locally, install Docker Engine and run the Lambda function with following command:

$ sam local invoke "HelloWorldFunction" -e events/event.json

The first time this function is invoked, Docker downloads the lambci/lambda:nodejs12.x container image. It then invokes the Lambda function with a pre-defined event JSON file.

Helper tools

There are a number of open source tools and packages available to help you monitor, author, and optimize your Lambda-based applications. Some of the most popular tools are shown in the following list.

Template validation tooling

CloudFormation Linter is a validation tool that helps with your CloudFormation development cycle. It analyses CloudFormation YAML and JSON templates to resolve and validate intrinsic functions and resource properties. By analyzing your templates before deploying them, you can save valuable development time and build automated validation into your deployment release cycle.

Follow these instructions to install the tool.

Once, installed, run the cfn-lint command with the path to your AWS SAM template provided as the first argument:

cfn-lint template.yaml
AWS SAM template validation with cfn-lint

AWS SAM template validation with cfn-lint

The following example shows that the template is not valid because the !GettAtt function does not evaluate correctly.

IDE tooling

Use AWS IDE plugins to author and invoke Lambda functions from within your existing integrated development environment (IDE). AWS IDE toolkits are available for PyCharm, IntelliJ. Visual Studio.

The AWS Toolkit for Visual Studio Code provides an integrated experience for developing serverless applications. It enables you to invoke Lambda functions, specify function configurations, locally debug, and deploy—all conveniently from within the editor. The toolkit supports Node.js, Python, and .NET.

The AWS Toolkit for Visual Studio Code

From Visual Studio Code, choose the Extensions icon on the Activity Bar. In the Search Extensions in Marketplace box, enter AWS Toolkit and then choose AWS Toolkit for Visual Studio Code as shown in the following example. This opens a new tab in the editor showing the toolkit’s installation page. Choose the Install button in the header to add the extension.

AWS Toolkit extension for Visual Studio Code

AWS Toolkit extension for Visual Studio Code

AWS Cloud9

Another option to build a development environment without having to install anything locally is to use AWS Cloud9. AWS Cloud9 is a cloud-based integrated development environment (IDE) for writing, running, and debugging code from within the browser.

It provides a seamless experience for developing serverless applications. It has a preconfigured development environment that includes AWS CLI, AWS SAM CLI, SDKs, code libraries, and many useful plugins. AWS Cloud9 also provides an environment for locally testing and debugging AWS Lambda functions. This eliminates the need to upload your code to the Lambda console. It allows developers to iterate on code directly, saving time, and improving code quality.

Follow this guide to set up AWS Cloud9 in your AWS environment.

Advanced tooling

Efficient configuration of Lambda functions is critical when expecting optimal cost and performance of your serverless applications. Lambda allows you to control the memory (RAM) allocation for each function.

Lambda charges based on the number of function requests and the duration, the time it takes for your code to run. The price for duration depends on the amount of RAM you allocate to your function. A smaller RAM allocation may reduce the performance of your application if your function is running compute-heavy workloads. If performance needs outweigh cost, you can increase the memory allocation.

Cost and performance optimization tooling

AWS Lambda power tuner is an open source tool that uses an AWS Step Functions state machine to suggest cost and performance optimizations for your Lambda functions. It invokes a given function with multiple memory configurations. It analyzes the execution log results to determine and suggest power configurations that minimize cost and maximize performance.

To deploy the tool:

  1. Clone the repository as follows:
    $ git clone https://github.com/alexcasalboni/aws-lambda-power-tuning.git
  2. Create an Amazon S3 bucket and enter the deployment configurations in /scripts/deploy.sh:
    # config
    BUCKET_NAME=your-sam-templates-bucket
    STACK_NAME=lambda-power-tuning
    PowerValues='128,512,1024,1536,3008'
  3. Run the deploy.sh script from your terminal, this uses the AWS SAM CLI to deploy the application:
    $ bash scripts/deploy.sh
  4. Run the power tuning tool from the terminal using the AWS CLI:
    aws stepfunctions start-execution \
    --state-machine-arn arn:aws:states:us-east-1:0123456789:stateMachine:powerTuningStateMachine-Vywm3ozPB6Am \
    --input "{\"lambdaARN\": \"arn:aws:lambda:us-east-1:1234567890:function:testytest\", \"powerValues\":[128,256,512,1024,2048],\"num\":50,\"payload\":{},\"parallelInvocation\":true,\"strategy\":\"cost\"}" \
    --output json
  5. The Step Functions execution output produces a link to a visual summary of the suggested results:

    AWS Lambda power tuning results

    AWS Lambda power tuning results

Monitoring and debugging tooling

Sls-dev-tools is an open source serverless tool that delivers serverless metrics directly to the terminal. It provides developers with feedback on their serverless application’s metrics and key bindings that deploy, open, and manipulate stack resources. Bringing this data directly to your terminal or IDE, reduces context switching between the developer environment and the web interfaces. This can increase application development speed and improve user experience.

Follow these instructions to install the tool onto your development environment.

To open the tool, run the following command:

$ Sls-dev-tools

Follow the in-terminal interface to choose which stack to monitor or edit.

The following example shows how the tool can be used to invoke a Lambda function with a custom payload from within the IDE.

Invoke an AWS Lambda function with a custom payload using sls-dev-tools

Invoke an AWS Lambda function with a custom payload using sls-dev-tools

Serverless database tooling

NoSQL Workbench for Amazon DynamoDB is a GUI application for modern database development and operations. It provides a visual IDE tool for data modeling and visualization with query development features to help build serverless applications with Amazon DynamoDB tables. Define data models using one or more tables and visualize the data model to see how it works in different scenarios. Run or simulate operations and generate the code for Python, JavaScript (Node.js), or Java.

Choose the correct operating system link to download and install NoSQL Workbench on your development machine.

The following example illustrates a connection to a DynamoDB table. A data scan is built using the GUI, with Node.js code generated for inclusion in a Lambda function:

Connecting to an Amazon DynamoBD table with NoSQL Workbench for AmazonDynamoDB

Connecting to an Amazon DynamoDB table with NoSQL Workbench for Amazon DynamoDB

Generating query code with NoSQL Workbench for Amazon DynamoDB

Generating query code with NoSQL Workbench for Amazon DynamoDB

Conclusion

Building serverless applications allows developers to focus on business logic instead of managing and operating infrastructure. This is achieved by using managed services. Developers often struggle with knowing which tools, libraries, and frameworks are available to help with this new approach to building applications. This post shows tools that builders can use to create a serverless developer environment to help accelerate software development.

This list represents AWS and open source tools but does not include our APN Partners. For partner offers, check here.

Read more to start building serverless applications.

Fundbox: Simplifying Ways to Query and Analyze Data by Different Personas

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/fundbox-simplifying-ways-to-query-and-analyze-data-by-different-personas/

Fundbox is a leading technology platform focused on disrupting the $21 trillion B2B commerce market by building the world’s first B2B payment and credit network. With Fundbox, sellers of all sizes can quickly increase average order volumes (AOV) and improve close rates by offering more competitive net terms and payment plans to their SMB buyers. With heavy investments in machine learning and the ability to quickly analyze the transactional data of SMB’s, Fundbox is reimagining B2B payments and credit products in new category-defining ways.

Learn how how the company simplified the way different personas in the organization query and analyze data by building a self-service data orchestration platform. The platform architecture is entirely serverless, which simplifies the ability to scale and adopt to unpredictable demand. The platform was built using AWS Step Functions, AWS Lambda, Amazon API Gateway, Amazon DynamoDB, AWS Fargate, and other AWS Serverless managed services.

For more content like this, subscribe to our YouTube channels This is My Architecture, This is My Code, and This is My Model, or visit the This is My Architecture on AWS, which has search functionality and the ability to filter by industry, language, and service.

Enhancing customer safety by leveraging the scalable, secure, and cost-optimized Toyota Connected Data Lake

Post Syndicated from Sandeep Kulkarni original https://aws.amazon.com/blogs/big-data/enhancing-customer-safety-by-leveraging-the-scalable-secure-and-cost-optimized-toyota-connected-data-lake/

Toyota Motor Corporation (TMC), a global automotive manufacturer, has made “connected cars” a core priority as part of its broader transformation from an auto company to a mobility company. In recent years, TMC and its affiliate technology and big data company, Toyota Connected, have developed an array of new technologies to provide connected services that enhance customer safety and the vehicle ownership experience. Today, Toyota’s connected cars come standard with an on-board Data Communication Module (DCM) that links to a Controller Area Network (CAN). By using this hardware, Toyota provides various connected services to its customers.

Some of the connected services help drivers to safely enjoy their cars. Telemetry data is available from the car 24×7, and Toyota makes the data available to its dealers (when their customers opt-in for data sharing). For instance, a vehicle’s auxiliary battery voltage declines over time. With this data, dealership staff can proactively contact customers to recommend a charge prior to experiencing any issues. This automotive telemetry can also help fleet management companies monitor vehicle diagnostics, perform preventive maintenance and help avoid breakdowns.

There are other services such as usage-based auto insurance that leverage driving behavior data that can help safe drivers receive discounts on their car insurance. Telemetry plays a vital role in understanding driver behavior. If drivers choose to opt-in, a safety score can be generated based on their driving data and drivers can use their smartphones to check their safe driving scores.

A vehicle generates data every second, which can be bundled into larger packets at one-minute intervals. With millions of connected cars that have data points available every second, the incredible scale required to capture and store that data is immense—there are billions of messages daily generating petabytes of data. To make this vision a reality, Toyota Connected’s Mobility Team embarked on building a real-time “Toyota Connected Data Lake.” Given the scale, we leveraged AWS to build this platform. In this post, we show how we built the data lake and how we provide significant value to our customers.

Overview

The guiding principles for architecture and design that we used are as follows:

  • Serverless: We want to use cloud native technologies and spend minimal time on infrastructure maintenance.
  • Rapid speed to market: We work backwards from customer requirements and iterate frequently to develop minimally viable products (MVPs).
  • Cost-efficient at scale.
  • Low latency: near real time processing.

Our data lake needed to be able to:

  • Capture and store new data (relational and non-relational) at petabyte scale in real time.
  • Provide analytics that go beyond batch reporting and incorporate real time and predictive capabilities.
  • Democratize access to data in a secure and governed way, allowing our team to unleash their creative energy and deliver innovative solutions.

The following diagram shows the high-level architecture

Walkthrough

We built the serverless data lake with Amazon S3 as the primary data store, given the scalability and high availability of S3. The entire process is automated, which reduces the likelihood of human error, increases efficiency, and ensures consistent configurations over time, as well as reduces the cost of operations.

The key components of a data lake include Ingest, Decode, Transform, Analyze, and Consume:

  • IngestConnected vehicles send telemetry data once a minute—which includes speed, acceleration, turns, geo location, fuel level, and diagnostic error codes. This data is ingested into Amazon Kinesis Data Streams, processed through AWS Lambda to make it readable, and the “raw copy” is saved through Amazon Kinesis Data Firehose into an S3
  • Decode:  Data arriving into the Kinesis data stream in the ‘Decode’ pillar is decoded by a serverless Lambda function, which does most of the heavy lifting. Based upon a proprietary specification, this Lambda function does the bit-by-bit decoding of the input message to capture the particular sensor values. The small input payload of 35KB with data from over 180 sensors is now decoded and converted to a JSON message of 3 MB. This is then compressed and written to the ‘Decoded S3 bucket’.
  • Transform The aggregation jobs leverage the massively parallel capability of Amazon EMR, decrypt the decoded messages and convert the data to Apache Parquet Apache Parquet is a columnar storage file format designed for querying large amounts of data, regardless of the data processing framework, or programming language. Parquet allows for better compression, which reduces the amount of storage required. It also reduces I/O, since we can efficiently scan the data. The data sets are now available for analytics purposes, partitioned by masked identification numbers as well as by automotive models and dispatch type. A separate set of jobs transform the data and store it in Amazon DynamoDB to be consumed in real time from APIs.
  • ConsumeApplications needing to consume the data make API calls through the Amazon API Gateway. Authentication to the API calls is based on temporary tokens issued by Amazon Cognito.
  • AnalyzeData analytics can be directly performed off Amazon S3 by leveraging serverless Amazon Athena. Data access is democratized and made available to data science groups, who build and test various models that provide value to our customers.

Additionally, comprehensive monitoring is set up by leveraging Amazon CloudWatch, Amazon ES, and AWS KMS for managing the keys securely.

Scalability

The scalability capabilities of the building blocks in our architecture that allow us to reach this massive scale are:

  • S3: S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. S3 partitions the index based on key name. To maximize performance of high-concurrency operations on S3, we introduced randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.
  • Lambda: We can run as many concurrent functions as needed and can raise limits as required with AWS support.
  • Kinesis Firehose: It scales elastically based on volume without requiring any human intervention. We batch requests up to 128MiB or 15 minutes, whichever comes earlier to avoid small files. Additional details are available in Srikanth Kodali’s blog post.
  • Kinesis Data Streams: We developed an automated program that adjusts the shards based on incoming volume. This is based on the Kinesis Scaling Utility from AWS Labs, which allows us to scale in a way similar to EC2 Auto Scaling groups.
  • API Gateway: automatically scales to billions of requests and seamlessly handles our API traffic.
  • EMR cluster: We can programmatically scale out to hundreds of nodes based on our volume and scale in after processing is completed.

Our volumes have increased seven-fold since we migrated to AWS and we have only adjusted the number of shards in Kinesis Data Streams and the number of core nodes for EMR processing to scale with the volume.

Security in the AWS cloud

AWS provides a robust suite of security services, allowing us to have a higher level of security in the AWS cloud. Consistent with our security guidelines, data is encrypted both in transit and at rest. Additionally, we use VPC Endpoints, allowing us to keep traffic within the AWS network.

Data protection in transit:

Data protection at rest:

  • S3 server-side encryption handles all encryption, decryption and key management transparently. All user data stored in DynamoDB is fully encrypted at rest, for which we use an AWS-owned customer master key at no additional charge. Server-side encryption for Kinesis Data streams and Kinesis Data Firehose is also enabled to ensure that data is encrypted at rest.

Cost optimization

Given our very large data volumes, we were methodical about optimizing costs across all components of the infrastructure. The ultimate goal was to figure out the cost of the APIs we were exposing. We developed a robust cost model validated with performance testing at production volumes:

  • NAT gateway: When we started this project, one of the significant cost drivers was traffic flowing from Lambda to Kinesis Data Firehose that went over the NAT gateway, since Kinesis Data Firehose did not have a VPC endpoint. Traffic flowing through the NAT gateway costs $0.045/GB, whereas traffic flowing through the VPC endpoint costs $0.01/GB. Based on a product feature request from Toyota, AWS implemented this feature (VPC Endpoint for Firehose) early this year. We implemented this feature, which resulted in a four-and-a-half-fold reduction in our costs for data transfer.
  • Kinesis Data Firehose: Since Kinesis Data Firehose did not support encryption of data at rest initially, we had to use client-side encryption using KMS–this was the second significant cost driver. We requested a feature for native server-side encryption in Kinesis Data Firehose. This was released earlier this year and we enabled server-side encryption on the Kinesis Data Firehose stream. This removed the Key Management Service (KMS), resulting in another 10% reduction in our total costs.

Since Kinesis Data Firehose charges based on the amount of data ingested ($0.029/GB), our Lambda function compresses the data before writing to Kinesis Data Firehose, which saves on the ingestion cost.

  • S3– We use lifecycle policies to move data from S3 (which costs $0.023/GB) to Amazon S3 Glacier (which costs $0.004/GB) after a specified duration. Glacier provides a six-fold cost reduction over S3. We further plan to move the data from Glacier to Amazon S3 Glacier Deep Archive (which costs $0.00099/GB), which will provide us a four-fold reduction over Glacier costs. Additionally, we have set up automated deletes of certain data sets at periodic intervals.
  • EMR– We were planning to use AWS Glue and keep the architecture serverless, but made the decision to leverage EMR from a cost perspective. We leveraged spot instances for transformation jobs in EMR, which can provide up to 60% savings. The hourly jobs complete successfully with spot instances, however the nightly aggregation jobs leveraging r5.4xlarge instances failed frequently as sufficient spot capacity was not available. We decided to move to “on-demand” instances, while we finalize our strategy for “reserved instances” to reduce costs.
  • DynamoDB: Time to Live (TTL) for DynamoDB lets us define when items in a table expire so that they can be automatically deleted from the database. We enabled TTL to expire objects that are not needed after a certain duration. We plan to use reserved capacity for read and write control units to reduce costs. We also use DynamoDB auto scaling ,which helps us manage capacity efficiently, and lower the cost of our workloads because they have a predictable traffic pattern. In Q2 of 2019, DynamoDBremoved the associated costs of DynamoDB Streams used in replicating data globally, which translated to extra cost savings in global tables.
  • Amazon DynamoDB Accelerator(DAX):  Our DynamoDB tables are front-ended by DAX, which improves the response time of our application by dramatically reducing read latency, as compared to using DynamoDB. Using DAX, we also lower the cost of DynamoDB by reducing the amount of provisioned read throughput needed for read-heavy applications.
  • Lambda: We ran benchmarks to arrive at the optimal memory configuration for Lambda functions. Memory allocation in Lambda determines CPU allocation and for some of our Lambda functions, we allocated higher memory, which results in faster execution, thereby reducing the amount of GB-seconds per function execution, which saves time and cost. Using DynamoDB Accelerator (DAX) from  Lambda has several benefits for serverless applications that also use DynamoDB. DAX can improve the response time of your application by dramatically reducing read latency, as compared to using DynamoDB. For serverless applications, combining Lambda with DAX provides an additional benefit: Lower latency results in shorter execution times, which means lower costs for Lambda.
  • Kinesis Data Streams: We scale our streams through an automated job, since our traffic patterns are fairly predictable. During peak hours we add additional shards and delete them during the off-peak hours, thus allowing us to reduce costs when shards are not in use

Enhancing customer safety

The Data Lake presents multiple opportunities to enhance customer safety. Early detection of market defects and pinpointing of target vehicles affected by those defects is made possible through the telemetry data ingested from the vehicles. This early detection leads to early resolution way before the customer is affected. On-board software in the automobiles can be constantly updated over-the-air (OTA), thereby saving time and costs. The automobile can generate a Health Check Report based on the driving style of its drivers, which can create the ideal maintenance plan for drivers for worry-free driving.

The driving data for an individual driver based on speed, sharp turns, rapid acceleration, and sudden braking can be converted into a “driver score” which ranges from 1 to 100 in value. The higher the driver-score, the safer the driver. Drivers can view their scores on mobile devices and monitor the specific locations of harsh driving on the journey map. They can then use this input to self-correct and modify their driving habits to improve their scores, which will not only result in a safer environment but drivers could also get lower insurance rates from insurance companies. This also gives parents an opportunity to monitor the scores for their teenage drivers and coach them appropriately on safe driving habits. Additionally, notifications can be generated if the teenage driver exceeds an agreed-upon speed or leaves a specific area.

Summary

The automated serverless data lake is a robust scalable platform that allows us to analyze data as it becomes available in real time. From an operations perspective, our costs are down significantly. Several aggregation jobs that took 15+ hours to run, now finish in 1/40th of the time. We are impressed with the reliability of the platform that we built. The architectural decision to go serverless has reduced operational burden and will also allow us to have a good handle on our costs going forward. Additionally, we can deploy this pipeline in other geographies with smaller volumes and only pay for what we consume.

Our team accomplished this ambitious development in a short span of six months. They worked in an agile, iterative fashion and continued to deliver robust MVPs to our business partners. Working with the service teams at AWS on product feature requests and seeing them come to fruition in a very short time frame has been a rewarding experience and we look forward to the continued partnership on additional requests.

 


About the Authors


Sandeep Kulkarni is an enterprise architect at AWS. His passion is to accelerate digital transformation for customers and build highly scalable and cost-effective solutions in the cloud. In his spare time, he loves to do yoga and gardening.

 

 

 

 

Shravanthi Denthumdas is the director of mobility services at Toyota Connected.Her team is responsible for building the Data Lake and delivering services that allow drivers to safely enjoy their cars. In her spare time, she likes to spend time with her family and children.