Tag Archives: Kinesis Data Streams

Stream Apache HBase edits for real-time analytics

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/stream-apache-hbase-edits-for-real-time-analytics/

Apache HBase is a non-relational database. To use the data, applications need to query the database to pull the data and changes from tables. In this post, we introduce a mechanism to stream Apache HBase edits into streaming services such as Apache Kafka or Amazon Kinesis Data Streams. In this approach, changes to data are pushed and queued into a streaming platform such as Kafka or Kinesis Data Streams for real-time processing, using a custom Apache HBase replication endpoint.

We start with a brief technical background on HBase replication and review a use case in which we store IoT sensor data into an HBase table and enrich rows using periodic batch jobs. We demonstrate how this solution enables you to enrich the records in real time and in a serverless way using AWS Lambda functions.

Common scenarios and use cases of this solution are as follows:

  • Auditing data, triggers, and anomaly detection using Lambda
  • Shipping WALEdits via Kinesis Data Streams to Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) to index records asynchronously
  • Triggers on Apache HBase bulk loads
  • Processing streamed data using Apache Spark Streaming, Apache Flink, Amazon Kinesis Data Analytics, or Kinesis analytics
  • HBase edits or change data capture (CDC) replication into other storage platforms, such as Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), and Amazon DynamoDB
  • Incremental HBase migration by replaying the edits from any point in time, based on configured retention in Kafka or Kinesis Data Streams

This post progresses into some common use cases you might encounter, along with their design options and solutions. We review and expand on these scenarios in separate sections, in addition to the considerations and limits in our application designs.

Introduction to HBase replication

At a very high level, the principle of HBase replication is based on replaying transactions from a source cluster to the destination cluster. This is done by replaying WALEdits or Write Ahead Log entries on the RegionServers of the source cluster into the destination cluster. To explain WALEdits, in HBase, all the mutations in data like PUT or DELETE are written to MemStore of their specific region and are appended to a WAL file as WALEdits or entries. Each WALEdit represents a transaction and can carry multiple write operations on a row. Because the MemStore is an in-memory entity, in case a region server fails, the lost data can be replayed and restored from the WAL files. Having a WAL is optional, and some operations may not require WALs or can request to bypass WALs for quicker writes. For example, records in a bulk load aren’t recorded in WAL.

HBase replication is based on transferring WALEdits to the destination cluster and replaying them so any operation that bypasses WAL isn’t replicated.

When setting up a replication in HBase, a ReplicationEndpoint implementation needs to be selected in the replication configuration when creating a peer, and on every RegionServer, an instance of ReplicationEndpoint runs as a thread. In HBase, a replication endpoint is pluggable for more flexibility in replication and shipping WALEdits to different versions of HBase. You can also use this to build replication endpoints for sending edits to different platforms and environments. For more information about setting up replication, see Cluster Replication.

HBase bulk load replication HBASE-13153

In HBase, bulk loading is a method to directly import HFiles or Store files into RegionServers. This avoids the normal write path and WALEdits. As a result, far less CPU and network resources are used when importing big portions of data into HBase tables.

You can also use HBase bulk loads to recover data when an error or outage causes the cluster to lose track of regions and Store files.

Because bulk loads skip WAL creation, all new records aren’t replicated to the secondary cluster. In HBASE-13153, which is an enhancement, a bulk load is represented as a bulk load event, carrying the location of the imported files. You can activate this by setting hbase.replication.bulkload.enabled to true and setting hbase.replication.cluster.id to a unique value as a prerequisite.

Custom streaming replication endpoint

We can use HBase’s pluggable endpoints to stream records into platforms such as Kinesis Data Streams or Kafka. Transferred records can be consumed by Lambda functions, processed by a Spark Streaming application or Apache Flink on Amazon EMR, Kinesis Data Analytics, or any other big data platform.

In this post, we demonstrate an implementation of a custom replication endpoint that allows replicating WALEdits in Kinesis Data Streams or Kafka topics.

In our example, we built upon the BaseReplicationEndpoint abstract class, inheriting the ReplicationEndpoint interface.

The main method to implement and override is the replicate method. This method replicates a set of items it receives every time it’s called and blocks until all those records are replicated to the destination.

For our implementation and configuration options, see our GitHub repository.

Use case: Enrich records in real time

We now use the custom streaming replication endpoint implementation to stream HBase edits to Kinesis Data Streams.

The provided AWS CloudFormation template demonstrates how we can set up an EMR cluster with replication to either Kinesis Data Streams or Apache Kafka and consume the replicated records, using a Lambda function to enrich data asynchronously, in real time. In our sample project, we launch an EMR cluster with an HBase database. A sample IoT traffic generator application runs as a step in the cluster and puts records, containing a registration number and an instantaneous speed, into a local HBase table. Records are replicated in real time into a Kinesis stream or Kafka topic based on the selected option at launch, using our custom HBase replication endpoint. When the step starts putting the records into the stream, a Lambda function is provisioned and starts digesting records from the beginning of the stream and catches up with the stream. The function calculates a score per record, based on a formula on variation from minimum and maximum speed limits in the use case, and persists the result as a score qualifier into a different column family, out of replication scope in the source table, by running HBase puts on RowKey.

The following diagram illustrates this architecture.

To launch our sample environment, you can use our template on GitHub.

The template creates a VPC, public and private subnets, Lambda functions to consume records and prepare the environment, an EMR cluster with HBase, and a Kinesis data stream or an Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster depending on the selected parameters when launching the stack.

Architectural design patterns

Traditionally, Apache HBase tables are considered as data stores, where consumers get or scan the records from tables. It’s very common in modern databases to react to database logs or CDC for real-time use cases and triggers. With our streaming HBase replication endpoint, we can project table changes into message delivery systems like Kinesis Data Streams or Apache Kafka.

We can trigger Lambda functions to consume messages and records from Apache Kafka or Kinesis Data Streams to consume the records in a serverless design or the wider Amazon Kinesis ecosystems such as Kinesis Data Analytics or Amazon Kinesis Data Firehose for delivery into Amazon S3. You could also pipe in Amazon OpenSearch Service.

A wide range of consumer ecosystems, such as Apache Spark, AWS Glue, and Apache Flink, is available to consume from Kafka and Kinesis Data Streams.

Let’s review few other common use cases.

Index HBase rows

Apache HBase rows are retrievable by RowKey. Writing a row into HBase with the same RowKey overwrites or creates a new version of the row. To retrieve a row, it needs to be fetched by the RowKey or a range of rows needs to be scanned if the RowKey is unknown.

In some use cases, scanning the table for a specific qualifier or value is expensive if we index our rows in another parallel system like Elasticsearch asynchronously. Applications can use the index to find the RowKey. Without this solution, a periodic job has to scan the table and write them into an indexing service like Elasticsearch to hydrate the index, or the producing application has to write in both HBase and Elasticsearch directly, which adds overhead to the producer.

Enrich and audit data

A very common use case for HBase streaming endpoints is enriching data and storing the enriched records in a data store, such as Amazon S3 or RDS databases. In this scenario, a custom HBase replication endpoint streams the records into a message distribution system such as Apache Kafka or Kinesis Data Streams. Records can be serialized, using the AWS Glue Schema Registry for schema validation. A consumer on the other end of the stream reads the records, enriches them, and validates against a machine learning model in Amazon SageMaker for anomaly detection. The consumer persists the records in Amazon S3 and potentially triggers an alert using Amazon Simple Notification Service (Amazon SNS). Stored data on Amazon S3 can be further digested on Amazon EMR, or we can create a dashboard on Amazon QuickSight, interfacing Amazon Athena for queries.

The following diagram illustrates our architecture.

Store and archive data lineage

Apache HBase comes with the snapshot feature. You can freeze the state of tables into snapshots and export them to any distributed file system like HDFS or Amazon S3. Recovering snapshots restores the entire table to the snapshot point.

Apache HBase also supports versioning at the row level. You can configure column families to keep row versions, and the default versioning is based on timestamps.

However, when using this approach to stream records into Kafka or Kinesis Data Streams, records are retained inside the stream, and you can partially replay a period. Recovering snapshots only recovers up to the snapshot point and the future records aren’t present.

In Kinesis Data Streams, by default records of a stream are accessible for up to 24 hours from the time they are added to the stream. This limit can be increased to up to 7 days by enabling extended data retention, or up to 365 days by enabling long-term data retention. See Quotas and Limits for more information.

In Apache Kafka, record retention has virtually no limits based on available resources and disk space configured on the Kafka cluster, and can be configured by setting log.retention.

Trigger on HBase bulk load

The HBase bulk load feature uses a MapReduce job to output table data in HBase’s internal data format, and then directly loads the generated Store files into the running cluster. Using bulk load uses less CPU and network resources than loading via the HBase API, as HBase bulk load bypasses WALs in the write path and the records aren’t seen by replication. However, since HBASE-13153, you can configure HBase to replicate a meta record as an indication of a bulk load event.

A Lambda function processing replicated WALEdits can listen to this event to trigger actions, such as automatically refreshing a read replica HBase cluster on Amazon S3 whenever a bulk load happens. The following diagram illustrates this workflow.

Considerations for replication into Kinesis Data Streams

Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources with very low latency. Kinesis is fully managed and runs your streaming applications without requiring you to manage any infrastructure. It’s durable, because records are synchronously replicated across three Availability Zones, and you can increase data retention to 365 days.

When considering Kinesis Data Streams for any solution, it’s important to consider service limits. For instance, as of this writing, the maximum size of the data payload of a record before base64-encoding is up to 1 MB, so we must make sure the records or serialized WALEdits remain within the Kinesis record size limit. To be more efficient, you can enable the hbase.replication.compression-enabled attribute to GZIP compress the records before sending them to the configured stream sink.

Kinesis Data Streams retains the order of the records within the shards as they arrive, and records can be read or processed in the same order. However, in this sample custom replication endpoint, a random partition key is used so that the records are evenly distributed between the shards. We can also use a hash function to generate a partition key when putting records into the stream, for example based on the Region ID so that all the WALEdits from the same Region land in the same shard and consumers can assume Region locality per shards.

For delivering records in KinesisSinkImplemetation, we use the Amazon Kinesis Producer Library (KPL) to put records into Kinesis data streams. The KPL simplifies producer application development; we can achieve high write throughput to a Kinesis data stream. We can use the KPL in either synchronous or asynchronous use cases. We suggest using the higher performance of the asynchronous interface unless there is a specific reason to use synchronous behavior. KPL is very configurable and has retry logic built in. You can also perform record aggregation for maximum throughput. In KinesisSinkImplemetation, by default records are asynchronously replicated to the stream. We can change to synchronous mode by setting hbase.replication.kinesis.syncputs to true. We can enable record aggregation by setting hbase.replication.kinesis.aggregation-enabled to true.

The KPL can incur an additional processing delay because it buffers records before sending them to the stream based on a user-configurable attribute of RecordMaxBufferedTime. Larger values of RecordMaxBufferedTime results in higher packing efficiencies and better performance. However, applications that can’t tolerate this additional delay may need to use the AWS SDK directly.

Kinesis Data Streams and the Kinesis family are fully managed and easily integrate with the rest of the AWS ecosystem with minimum development effort with services such as the AWS Glue Schema Registry and Lambda. We recommend considering Kinesis Data Streams for low-latency, real-time use cases on AWS.

Considerations for replication into Apache Kafka

Apache Kafka is a high-throughput, scalable, and highly available open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

AWS offers Amazon MSK as a fully managed Kafka service. Amazon MSK provides the control plane operations and runs open-source versions of Apache Kafka. Existing applications, tooling, and plugins from partners and the Apache Kafka community are supported without requiring changes to application code.

You can configure this sample project for Apache Kafka brokers directly or just point towards an Amazon MSK ARN for replication.

Although there is virtually no limit on the size of the messages in Kafka, the default maximum message size is set to 1 MB by default, so we must make sure the records, or serialized WALEdits, remain within the maximum message size for topics.

The Kafka producer tries to batch records together whenever possible to limit the number of requests for more efficiency. This is configurable by setting batch.size, linger.ms, and delivery.timeout.ms.

In Kafka, topics are partitioned, and partitions are distributed between different Kafka brokers. This distributed placement allows for load balancing of consumers and producers. When a new event is published to a topic, it’s appended to one of the topic’s partitions. Events with the same event key are written to the same partition, and Kafka guarantees that any consumer of a given topic partition can always read that partition’s events in exactly the same order as they were written. KafkaSinkImplementation uses a random partition key to distribute the messages evenly between the partitions. This could be based on a heuristic function, for example based on Region ID, if the order of the WALEdits or record locality is important by the consumers.

Semantic guarantees

Like any streaming application, it’s important to consider semantic guarantee from the producer of messages, to acknowledge or fail the status of delivery of messages in the message queue and checkpointing on the consumer’s side. Based on our use cases, we need to consider the following:

  • At most once delivery – Messages are never delivered more than once, and there is a chance of losing messages
  • At least once delivery – Messages can be delivered more than once, with no loss of messages
  • Exactly once delivery – Every message is delivered only once, and there is no loss of messages

After changes are persisted as WALs and in MemStore, the replicate method in ReplicationEnpoint is called to replicate a collection of WAL entries and returns a Boolean (true/false) value. If the returned value is true, the entries are considered successfully replicated by HBase and the replicate method is called for the next batch of WAL entries. Depending on configuration, both KPL and Kafka producers might buffer the records for longer if configured for asynchronous writes. Failures can cause loss of entries, retries, and duplicate delivery of records to the stream, which could be determinantal for synchronous or asynchronous message delivery.

If our operations aren’t idempotent, you can checkpoint or check for unique sequence numbers on the consumer side. For a simple HBase record replication, RowKey operations are idempotent and they carry a timestamp and sequence ID.

Summary

Replication of HBase WALEdits into streams is a powerful tool that you can use in multiple use cases and in combination with other AWS services. You can create practical solutions to further process records in real time, audit the data, detect anomalies, set triggers on ingested data, or archive data in streams to be replayed on other HBase databases or storage services from a point in time. This post outlined some common use cases and solutions, along with some best practices when implementing your custom HBase streaming replication endpoints.

Review, clone, and try our HBase replication endpoint implementation from our GitHub repository and launch our sample CloudFormation template.

We like to learn about your use cases. If you have questions or suggestions, please leave a comment.


About the Authors

Amir Shenavandeh is a Senior Hadoop systems engineer and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimization. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Maryam Tavakoli is a Cloud Engineer and Amazon OpenSearch subject matter expert at Amazon Web Services. She helps customers with their Analytics and Streaming workload optimization and is passionate about solving complex problems with simplistic user experience that can empower customers to be more productive.

Integrate Etleap with Amazon Redshift Streaming Ingestion (preview) to make data available in seconds

Post Syndicated from Caius Brindescu original https://aws.amazon.com/blogs/big-data/integrate-etleap-with-amazon-redshift-streaming-ingestion-preview-to-make-data-available-in-seconds/

Amazon Redshift is a fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using SQL and your extract, transform, and load (ETL), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads.

Etleap is an AWS Advanced Technology Partner with the AWS Data & Analytics Competency and Amazon Redshift Service Ready designation. Etleap ETL removes the headaches experienced building data pipelines. A cloud-native platform that seamlessly integrates with AWS infrastructure, Etleap ETL consolidates data without the need for coding. Automated issue detection pinpoints problems so data teams can stay focused on business initiatives, not data pipelines.

In this post, we show how Etleap customers are integrating with the new streaming ingestion feature in Amazon Redshift (currently in limited preview) to load data directly from Amazon Kinesis Data Streams. This reduces load times from minutes to seconds and helps you gain faster data insights.

Amazon Redshift streaming ingestion with Kinesis Data Streams

Traditionally, you had to use Amazon Kinesis Data Firehose to land your stream into Amazon Simple Storage Service (Amazon S3) files and then employ a COPY command to move the data into Amazon Redshift. This method incurs latencies in the order of minutes.

Now, the native streaming ingestion feature in Amazon Redshift lets you ingest data directly from Kinesis Data Streams. The new feature enables you to ingest hundreds of megabytes of data per second and query it at exceptionally low latency—in many cases only 10 seconds after entering the data stream.

Configure Amazon Redshift streaming ingestion with SQL queries

Amazon Redshift streaming ingestion uses SQL to connect with one or more Kinesis data streams simultaneously. In this section, we walk through the steps to configure streaming ingestion.

Create an external schema

We begin by creating an external schema referencing Kinesis using syntax adapted from Redshift’s support for Federated Queries:

CREATE EXTERNAL SCHEMA MySchema
FROM Kinesis
IAM_ROLE { default | 'iam-role-arn' };

This external schema command creates an object inside Amazon Redshift that acts as a proxy to Kinesis Data Streams. Specifically, to the collection of data streams that are accessible via the AWS Identity and Access Management (IAM) role. You can use either the default Amazon Redshift cluster IAM role or a specified IAM role that has been attached to the cluster previously.

Create a materialized view

You can use Amazon Redshift materialized views to materialize a point-in-time view of a Kinesis data stream, as accumulated up to the time it is queried. The following command creates a materialized view over a stream from the previously defined schema:

CREATE MATERIALIZED VIEW MyView AS
SELECT *
FROM MySchema.MyStream;

Note the use of the dot syntax to pick out the particular stream desired. The attributes of the stream include a timestamp field, partition key, sequence number, and a VARBYTE data payload.

Although the previous materialized view definition simply performs a SELECT *, more sophisticated processing is possible, for instance, applying filtering conditions or shredding JSON data into columns. To demonstrate, consider the following Kinesis data stream with JSON payloads:

{
 “player” : “alice 127”,
 “region” : “us-west-1”,
 “action” : “entered shop”,
}

To demonstrate this, write a materialized view that shreds the JSON into columns, focusing only on the entered shop action:

CREATE MATERIALIZED VIEW ShopEntrances AS
SELECT ApproximateArrivalTimestamp, SequenceNumber,
   json_extract_path_text(from_varbyte(Data, 'utf-8'), 'player') as Player,
   json_extract_path_text(from_varbyte(Data, 'utf-8'), 'region') as Region
FROM MySchema.Actions
WHERE json_extract_path_text(from_varbyte(Data, 'utf-8'), 'action') = 'entered shop';

On the Amazon Redshift leader node, the view definition is parsed and analyzed. On success, it is added to the system catalogs. No further communication with Kinesis Data Streams occurs until the initial refresh.

Refresh the materialized view

The following command pulls data from Kinesis Data Streams into Amazon Redshift:

REFRESH MATERIALIZED VIEW MyView;

You can initiate it manually (via the SQL preceding command) or automatically via a scheduled query. In either case, it uses the IAM role associated with the stream. Each refresh is incremental and massively parallel, storing its progress in each Kinesis shard in the system catalogs so as to be ready for the next round of refresh.

With this process, you can now query near-real-time data from your Kinesis data stream through Amazon Redshift.

Use Amazon Redshift streaming ingestion with Etleap

Etleap pulls data from databases, applications, file stores, and event streams, and transforms it before loading it into an AWS data repository. Data ingestion pipelines typically process batches every 5–60 minutes, so when you query your data in Amazon Redshift, it’s at least 5 minutes out of date. For many use cases, such as ad hoc queries and BI reporting, this latency time is acceptable.

But what about when your team demands more up-to-date data? An example is operational dashboards, where you need to track KPIs in near-real time. Amazon Redshift load times are bottlenecked by COPY commands that move data from Amazon S3 into Amazon Redshift, as mentioned earlier.

This is where streaming ingestion comes in: by staging the data in Kinesis Data Streams rather than Amazon S3, Etleap can reduce data latency in Amazon Redshift to less than 10 seconds. To preview this feature, we ingest data from SQL databases such as MySQL and Postgres that support change data capture (CDC). The data flow is shown in the following diagram.

Etleap manages the end-to-end data flow through AWS Database Migration Service (AWS DMS) and Kinesis Data Streams, and creates and schedules Amazon Redshift queries, providing up-to-date data.

AWS DMS consumes the replication logs from the source, and produces insert, update, and delete events. These events are written to a Kinesis data stream that has multiple shards in order to handle the event load. Etleap transforms these events according to user-specified rules, and writes them to another data stream. Finally, a sequence of Amazon Redshift commands load data from the stream into a destination table. This procedure takes less than 10 seconds in real-world scenarios.

Configure Amazon Redshift streaming ingestion with Etleap

Previously, we explored how data in Kinesis Data Streams can be accessed in Amazon Redshift using SQL queries. In this section, we see how Etleap uses the streaming ingestion feature to mirror a table from MySQL into Amazon Redshift, and the end-to-end latency we can achieve.

Etleap customers that are part of the Streaming Ingestion Preview Program can ingest data into Amazon Redshift directly from an Etleap-managed Kinesis data stream. All pipelines from a CDC-enabled source automatically use this feature.

The destination table in Amazon Redshift is Type 1, a mirror of the table in the source database.

For example, say you want to mirror a MySQL table in Amazon Redshift. The table represents the online shopping carts that users have open. In this case, low latency is critical so that the platform marketing strategists can instantly identify abandoned carts and high demand items.

The cart table has the following structure:

CREATE TABLE cart (
id int PRIMARY KEY AUTO_INCREMENT, 
user_id INT,
current_price DECIMAL(6,2),
no_items INT,
checked_out TINY_INT(1),
update_date TIMESTAMP
);

Changes from the source table are captured using AWS DMS and then sent to Etleap via a Kinesis data stream. Etleap transforms these records and writes them to another data stream using the following structure:

{
            "id": 8322,
            "user_id": 443,
            "current_price": 22.98,
            "no_items": 3,
            "checked_out": 0,
            "update_date": "2021-11-05 23:11",
            "op": "U"
}

The structure encodes the row that was modified or inserted, as well as the operation type (represented by the op column), which can have three values: I (insert), U (update) or D (delete).

This information is then materialized in Amazon Redshift from the data stream:

CREATE EXTERNAL SCHEMA etleap_stream
FROM KINESIS
IAM_ROLE '<redacted>';

CREATE MATERIALIZED VIEW cart_staging
DISTSTYLE KEY
	DISTKEY(id)
	SORTKEY(etleap_sequence_no)
AS SELECT
	CAST(PartitionKey as bigint) AS etleap_sequence_no,
	CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'id') as bigint) AS id,
	JSON_PARSE(FROM_VARBYTE(Data, 'utf-8')) AS Data
FROM etleap_stream."cart";

In the materialized view, we expose the following columns:

  • PartitionKey represents an Etleap sequence number, to ensure that updates are processed in the correct order.
  • We shred the primary keys of the table (id in the preceding example) from the payload, using them as a distribution key to improve the update performance.
  • The Data column is parsed out into a SUPER type from the JSON object in the stream. This is shredded into the corresponding columns in the cart table when the data is inserted.

With this staging materialized view, Etleap then updates the destination table (cart) that has the following schema:

CREATE TABLE cart ( 
id BIGINT PRIMARY KEY,
user_id BIGINT,
current_price DECIMAL(6,2),
no_items INT,
checked_out BOOLEAN,
update_date VARCHAR(64)
)
DISTSTYLE key
distkey(id);

To update the table, Etleap runs the following queries, selecting only the changed rows from the staging materialized view, and applies them to the cart table:

BEGIN;

REFRESH MATERIALIZED VIEW cart_staging;

UPDATE _etleap_si SET end_sequence_no = (
	SELECT COALESCE(MIN(etleap_sequence_no), (SELECT MAX(etleap_sequence_no) FROM cart_staging)) FROM 
	(
		SELECT 
			etleap_sequence_no, 
			LEAD(etleap_sequence_no, 1) OVER (ORDER BY etleap_sequence_no) - etleap_sequence_no AS diff
		FROM cart_staging 
		WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart')
	)
	WHERE diff > 1
) WHERE table_name = 'cart';



DELETE FROM cart
WHERE id IN (
	SELECT id
	FROM cart_staging
	WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart') 
	AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart')
);

INSERT INTO cart
SELECT 
	DISTINCT(id),
	CAST(Data."timestamp" as timestamp),
	CAST(Data.payload as varchar(256)),
	CAST(Data.etleap_sequence_no as bigint) from
  	(SELECT id, 
  	JSON_PARSE(LAST_VALUE(JSON_SERIALIZE(Data)) OVER (PARTITION BY id ORDER BY etleap_sequence_no ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) AS Data
   	FROM cart_staging
	WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart') 
	AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart'
AND Data.op != 'D')
);


UPDATE _etleap_si SET start_sequence_no = end_sequence_no WHERE table_name = 'cart';

COMMIT;

We run the following sequence of queries:

  1. Refresh the cart_staging materialized view to get new records from the cart stream.
  2. Delete all records from the cart table that were updated or deleted since the last time we ran the update sequence.
  3. Insert all the updated and newly inserted records from the cart_staging materialized view into the cart table.
  4. Update the _etleap_si bookkeeping table with the current position. Etleap uses this table to optimize the query in the staging materialized view.

This update sequence runs continuously to minimize end-to-end latency. To measure performance, we simulated the change stream from a database table that has up to 100,000 inserts, updates, and deletes. We tested target table sizes of up to 1.28 billion rows. Testing was done on a 2-node ra3.xlplus Amazon Redshift cluster and a Kinesis data stream with 32 shards.

The following figure shows how long the update sequence takes on average over 5 runs in different scenarios. Even in the busiest scenario (100,000 changes to a 1.28 billion row table), the sequence takes just over 10 seconds to run. In our experiment, the refresh time was independent of the delta size, and took 3.7 seconds with a standard deviation of 0.4 seconds.

This shows that the update process can keep up with source database tables that have 1 billion rows and 10,000 inserts, updates, and deletes per second.

Summary

In this post, you learned about the native streaming ingestion feature in Amazon Redshift and how it achieves latency in seconds, while ingesting data from Kinesis Data Streams into Amazon Redshift. You also learned about the architecture of Amazon Redshift with the streaming ingestion feature enabled, how to configure it using SQL commands, and use the capability in Etleap.

To learn more about Etleap, take a look at the Etleap ETL on AWS Quick Start, or visit their listing on AWS Marketplace.


About the Authors

Caius Brindescu is an engineer at Etleap with over 3 years of experience in developing ETL software. In addition to development work, he helps customers make the most out of Etleap and Amazon Redshift. He holds a PhD from Oregon State University and one AWS certification (Big Data – Specialty).

Todd J. Green is a Principal Engineer with AWS Redshift. Before joining Amazon, TJ worked at innovative database startups including LogicBlox and RelationalAI, and was an Assistant Professor of Computer Science at UC Davis. He received his PhD in Computer Science from UPenn. In his career as a researcher, TJ won a number of awards, including the 2017 ACM PODS Test-of-Time Award.

Maneesh Sharma is a Senior Database Engineer with Amazon Redshift. He works and collaborates with various Amazon Redshift Partners to drive better integration. In his spare time, he likes running, playing ping pong, and exploring new travel destinations.

Jobin George is a Big Data Solutions Architect with more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and big data partners.

Amazon Kinesis Data Streams On-Demand – Stream Data at Scale Without Managing Capacity

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/amazon-kinesis-data-streams-on-demand-stream-data-at-scale-without-managing-capacity/

Today we are launching Amazon Kinesis Data Streams On-demand, a new capacity mode. This capacity mode eliminates capacity provisioning and management for streaming workloads.

Kinesis Data Streams is a fully-managed, serverless service for real-time processing of streamed data at a massive scale. Kinesis Data Streams can take any amount of data, from any number of sources, and scale up and down as needed. Creating a new data stream is easy, since we announced Kinesis Data Streams in November 2013. To get started, you only need to specify the number of shards with which you must provision your stream.

Shards are the way to define capacity in Kinesis Data Streams. Each shard can ingest 1 MB/s and 1,000 records/second and egress up to 2 MB/s. You can add or remove shards of the stream using Kinesis Data Streams APIs to adjust the stream capacity according to the throughout needs of their workloads. This lets you make sure that producer and consumer applications don’t experience any throttling.

As customers adopt data streaming broadly, workloads with data traffic that can increase by millions of events in a few minutes are becoming more common. For these volatile traffic patterns, customers carefully plan capacity, monitor throughput, and in some cases develop processes that automatically change the Kinesis Data Streams stream capacity.

Kinesis Data Streams On-Demand Mode
That is why today we are announcing Kinesis Data Streams On-demand. This new capacity mode eliminates the need for provisioning and managing the capacity for streaming data. Using Kinesis Data Streams On-demand automatically scales the capacity in response to varying data traffic. Customers are charged per gigabyte of data written, read, and stored in the stream, in a pay-per-throughput fashion.

Data streams in the on-demand mode have the same high durability, high availability, low latency, security, and deep AWS integrations that Kinesis Data Streams already provides. Moreover, there are no new APIs to write or read data. All existing Kinesis Data Streams integrations work in the on-demand mode.

Kinesis Data Streams uses the partition key to distribute data across shards. That is why when using Kinesis Data Streams On-demand, you still must specify a partition key for each record to write data into a data stream, as you do today in Kinesis Data Streams using the provisioned mode. In Kinesis Data Streams On-demand, the data stream automatically adapts to handle uneven data distribution patterns. But you must be careful that no partition key exceeds a shard’s limits. If this happens, then you will receive write throttles, and then you can retry these requests.

When a new data stream is created using Kinesis Data Streams On-demand, it gets created with the default capacity of 4 MB/s and 4,000 records per second for writes. Kinesis Data Streams On-demand can automatically scale up to 200 MB/s and 200,000 records per second for writes.

Kinesis Data Streams On-demand accommodates up to double its previous peak write throughput observed in the last 30 days. As your data stream’s write throughput hits a new peak, Kinesis Data Streams automatically scales the stream’s capacity.

For example, if your data stream has a write throughput that varies between 10 MB/s and 40 MB/s, Kinesis Data Streams will make sure that you can easily burst to double the peak—80 MB/s. And, if later on that same data stream reaches a new peak of 50 MB/s, then Kinesis Data Streams will make sure that there is enough capacity to ingest 100 MB/s. However, write throttling can occur if your traffic grows more than double the previous peak in less than 15 minutes.

When to Use Kinesis Data Streams On-demand
On-demand mode is great for customers that have an unknown or variable workload, or who simply don’t want to deal with capacity management. On-demand mode works best for workloads that have even partition key distribution. For example, you run a mobile game that has variable traffic through the week or day, as customers play mostly on nights or weekends. Or, you run a streaming platform that hosts live shows, and you see a sudden increase in demand depending on the guests you have.

In addition, you can switch between on-demand and provision mode twice a day. For example, you run an e-commerce site with predictable traffic. But, starting next month, there will be many marketing campaigns launched globally. You don’t know the impact that those will have on the site traffic. Switch your Kinesis Data Streams to on-demand mode, and now you can enjoy the automated capacity planning and management for your data streams.

Get Started with Kinesis Data Streams On-demand
Create a new data stream with Kinesis Data Streams On-demand from the AWS console, AWS SDKs, AWS Command Line Interface (CLI), and AWS CloudFormation.

To create one from the console, visit the Kinesis console and Create data stream. When selecting the capacity mode, select On-demand.

Creating a data stream

At the end of the page, all of the settings for the new data stream are presented. These settings can be changed after the data stream has been created.

Data stream settings

Let’s See This in Action!
For this demo, I want to show you how the new Kinesis Data Streams capability works. This situation is best described if you at look at the following Amazon CloudWatch graphs. The green line represents the bytes ingested successfully into the stream, and the red line shows the percentage of traffic that is throttled.

First, we will start with a stream provisioned with five shards. For the first three minutes, we are sending a load of 4 MB/s. You can see that the stream can handle the load.

At the time stamp 21:19, we increase the load to 12 MB/s. Now the stream cannot handle the load, and the throttles start (the red line starts climbing up to 60 percent of request being throttled).

Increase the load on a provisioned stream

At the time stamp 21:23, we change the stream capacity from provisioned to on-demand. You can do that on-the-fly without affecting the stream. See that it takes a very short time for the stream to handle the load when converting from one capacity mode to the other.

In a few minutes (time stamp 21:24) the throttles start to drop as the stream starts scaling up. The stream capacity doubles to 10 shards first (time stamp 21:26), and the stream keeps scaling up until each shard has a load of less than 0.5 MB/s. In this way, if the stream suddenly receives double the amount of load, then it has the capacity ready to handle it.

Change to on-demand mode

At the time stamp 21:26, the load in the stream is increased to 18 MB/s. You can see the green line climbing to 350,000 records – there are no throttles, and the stream ends this demo with 40 open shards. This means that if suddenly the stream receives a load of 40 MB/s, then it could handle it with no problem.

Increase the load

Available Now!
The Amazon Kinesis Data Streams On-demand is available globally in all commercial Regions.

You can learn more about the capacity modes in the Amazon Kinesis Data Streams Developer Guide.

Marcia

Field Notes: How to Enable Cross-Account Access for Amazon Kinesis Data Streams using Kinesis Client Library 2.x

Post Syndicated from Uday Narayanan original https://aws.amazon.com/blogs/architecture/field-notes-how-to-enable-cross-account-access-for-amazon-kinesis-data-streams-using-kinesis-client-library-2-x/

Businesses today are dealing with vast amounts of real-time data they need to process and analyze to generate insights. Real-time delivery of data and insights enable businesses to quickly make decisions in response to sensor data from devices, clickstream events, user engagement, and infrastructure events, among many others.

Amazon Kinesis Data Streams offers a managed service that lets you focus on building and scaling your streaming applications for near real-time data processing, rather than managing infrastructure. Customers can write Kinesis Data Streams consumer applications to read data from Kinesis Data Streams and process them per their requirements.

Often, the Kinesis Data Streams and consumer applications reside in the same AWS account. However, there are scenarios where customers follow a multi-account approach resulting in Kinesis Data Streams and consumer applications operating in different accounts. Some reasons for using the multi-account approach are to:

  • Allocate AWS accounts to different teams, projects, or products for rapid innovation, while still maintaining unique security requirements.
  • Simplify AWS billing by mapping AWS costs specific to product or service line.
  • Isolate accounts for specific security or compliance requirements.
  • Scale resources and mitigate hard AWS service limits constrained to a single account.

The following options allow you to access Kinesis Data Streams across accounts.

  • Amazon Kinesis Client Library (KCL) for Java or using the MultiLang Daemon for KCL.
  • Amazon Kinesis Data Analytics for Apache Flink – Cross-account access is supported for both Java and Python. For detailed implementation guidance, review the AWS documentation page for Kinesis Data Analytics.
  • AWS Glue Streaming – The documentation for AWS Glue describes how to configure AWS Glue streaming ETL jobs to access cross-account Kinesis Data Streams.
  • AWS Lambda – Lambda currently does not support cross-account invocations from Amazon Kinesis, but a workaround can be used.

In this blog post, we will walk you through the steps to configure KCL for Java and Python for cross-account access to Kinesis Data Streams.

Overview of solution

As shown in Figure 1, Account A has the Kinesis data stream and Account B has the KCL instances consuming from the Kinesis data stream in Account A. For the purposes of this blog post the KCL code is running on Amazon Elastic Compute Cloud (Amazon EC2).

Figure 1. Steps to access a cross-account Kinesis data stream

The steps to access a Kinesis data stream in one account from a KCL application in another account are:

Step 1 – Create AWS Identity and Access Management (IAM) role in Account A to access the Kinesis data stream with trust relationship with Account B.

Step 2 – Create IAM role in Account B to assume the role in Account A. This role is attached to the EC2 fleet running the KCL application.

Step 3 – Update the KCL application code to assume the role in Account A to read Kinesis data stream in Account A.

Prerequisites

  • KCL for Java version 2.3.4 or later.
  • AWS Security Token Service (AWS STS) SDK.
  • Create a Kinesis data stream named StockTradeStream in Account A and a producer to load data into the stream. If you do not have a producer, you can use the Amazon Kinesis Data Generator to send test data into your Kinesis data stream.

Walkthrough

Step 1 – Create IAM policies and IAM role in Account A

First, we will create an IAM role in Account A, with permissions to access the Kinesis data stream created in the same account. We will also add Account B as a trusted entity to this role.

  1. Create IAM policy kds-stock-trade-stream-policy to access Kinesis data stream in Account A using the following policy definition. This policy restricts access to specific Kinesis data stream.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:Account-A-AccountNumber:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:Account-A-AccountNumber:stream/StockTradeStream/*"
            ]
        }
    ]
}

Note: The above policy assumes the name of the Kinesis data stream is StockTradeStream.

  1. Create IAM role kds-stock-trade-stream-role in Account A.
aws iam create-role --role-name kds-stock-trade-stream-role --assume-role-policy-document "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam::Account-B-AccountNumber:root\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
  1. Attach the kds-stock-trade-stream-policy IAM policy to kds-stock-trade-stream-role role.
aws iam attach-role-policy --policy-arn arn:aws:iam::Account-A-AccountNumber:policy/kds-stock-trade-stream-policy --role-name kds-stock-trade-stream-role

In the above steps, you will have to replace Account-A-AccountNumber with the AWS account number of the account that has the Kinesis data stream and Account-B-AccountNumber will need to be replaced with the AWS account number of the account that has the KCL application

Step 2 – Create IAM policies and IAM role in Account B

We will now create an IAM role in account B to assume the role created in Account A in Step 1. This role will also grant the KCL application access to Amazon DynamoDB and Amazon CloudWatch in Account B. For every KCL application, a DynamoDB table is used to keep track of the shards in a Kinesis data stream that are being leased and processed by the workers of the KCL consumer application. The name of the DynamoDB table is the same as the KCL application name. Similarly, the KCL application needs access to emit metrics to CloudWatch. Because the KCL application is running in Account B, we want to maintain the DynamoDB table and the CloudWatch metrics in the same account as the application code. For this blog post, our KCL application name is StockTradesProcessor.

  1. Create IAM policy kcl-stock-trader-app-policy, with permissions access to DynamoDB and CloudWatch in Account B, and to assume the kds-stock-trade-stream-role role created in Account A.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AssumeRoleInSourceAccount",
            "Effect": "Allow",
            "Action": "sts:AssumeRole",
            "Resource": "arn:aws:iam::Account-A-AccountNumber:role/kds-stock-trade-stream-role"
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:Scan",
                "dynamodb:PutItem",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:Account-B-AccountNumber:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

The above policy gives access to a DynamoDB table StockTradesProcessor. If you change you KCL application name, make sure you change the above policy to reflect the corresponding DynamoDB table name.

  1. Create role kcl-stock-trader-app-role in Account B to assume role in Account A.
aws iam create-role --role-name kcl-stock-trader-app-role --assume-role-policy-document "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":[\"ec2.amazonaws.com\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
  1. Attach the policy kcl-stock-trader-app-policy to the kcl-stock-trader-app-role.
aws iam attach-role-policy --policy-arn arn:aws:iam::Account-B-AccountNumber:policy/kcl-stock-trader-app-policy --role-name kcl-stock-trader-app-role
  1. Create an instance profile with a name as kcl-stock-trader-app-role.
aws iam create-instance-profile --instance-profile-name kcl-stock-trader-app-role
  1. Attach the kcl-stock-trader-app-role role to the instance profile.
aws iam add-role-to-instance-profile --instance-profile-name kcl-stock-trader-app-role --role-name kcl-stock-trader-app-role
  1. Attach the kcl-stock-trader-app-role to the EC2 instances that are running the KCL code.
aws ec2 associate-iam-instance-profile --iam-instance-profile Name=kcl-stock-trader-app-role --instance-id <your EC2 instance>

In the above steps, you will have to replace Account-A-AccountNumber with the AWS account number of the account that has the Kinesis data stream, Account-B-AccountNumber will need to be replaced with the AWS account number of the account which has the KCL application and <your EC2 instance id> will need to be replaced with the correct EC2 instance id. This instance profile should be added to any new EC2 instances of the KCL application that are started.

Step 3 – Update KCL stock trader application to access cross-account Kinesis data stream

KCL application in java

To demonstrate the setup for cross-account access for KCL using Java, we have used the KCL stock trader application as the starting point and modified it to enable access to a Kinesis data stream in another AWS account.

After the IAM policies and roles have been created and attached to the EC2 instance running the KCL application, we will update the main class of the consumer application to enable cross-account access.

Setting up the integrated development environment (IDE)

To download and build the code for the stock trader application, follow these steps:

  1. Clone the source code from the GitHub repository to your computer.
$  git clone https://github.com/aws-samples/amazon-kinesis-learning
Cloning into 'amazon-kinesis-learning'...
remote: Enumerating objects: 169, done.
remote: Counting objects: 100% (77/77), done.
remote: Compressing objects: 100% (37/37), done.
remote: Total 169 (delta 16), reused 56 (delta 8), pack-reused 92
Receiving objects: 100% (169/169), 45.14 KiB | 220.00 KiB/s, done.
Resolving deltas: 100% (29/29), done.
  1. Create a project in your integrated development environment (IDE) with the source code you downloaded in the previous step. For this blog post, we are using Eclipse for our IDE, therefore the instructions will be specific to Eclipse project.
    1. Open Eclipse IDE. Select File -> Import.
      A dialog box will open, as shown in Figure 2.

Figure 2. Create an Eclipse project

  1. Select Maven -> Existing Maven Projects, and select Next. Then you will be prompted to select a folder location for stock trader application.

Figure 3. Select the folder for your project

Select Browse, and navigate to the downloaded source code folder location. The IDE will automatically detect maven pom.xml.

Select Finish to complete the import. IDE will take 2–3 minutes to download all libraries to complete setup stock trader project.

  1. After the setup is complete, the IDE will look like similar to Figure 4.
Figure 4. Final view of pom.xl file after setup is complete

Figure 4. Final view of pom.xl file after setup is complete

  1. Open pom.xml, and replace it with the following content. This will add all the prerequisites and dependencies required to build and package the jar application.
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-learning</artifactId>
    <packaging>jar</packaging>
    <name>Amazon Kinesis Tutorial</name>
    <version>0.0.1</version>
    <description>Tutorial and examples for aws-kinesis-client
    </description>
    <url>https://aws.amazon.com/kinesis</url>

    <scm>
        <url>https://github.com/awslabs/amazon-kinesis-learning.git</url>
    </scm>

    <licenses>
        <license>
            <name>Amazon Software License</name>
            <url>https://aws.amazon.com/asl</url>
            <distribution>repo</distribution>
        </license>
    </licenses>

    <properties>
        <aws-kinesis-client.version>2.3.4</aws-kinesis-client.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>software.amazon.kinesis</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>2.3.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
		<dependency>
		    <groupId>software.amazon.awssdk</groupId>
		    <artifactId>sts</artifactId>
		    <version>2.16.74</version>
		</dependency>
		<dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <version>1.7.25</version>
   </dependency>
    </dependencies>

 	<build>
        <finalName>amazon-kinesis-learning</finalName>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.1</version>

                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>

                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
    
            </plugin>
        </plugins>
    </build>


</project>

Update the main class of consumer application

The updated code for the StockTradesProcessor.java class is shown as follows. The changes made to the class to enable cross-account access are highlighted in bold.

package com.amazonaws.services.kinesis.samples.stocktrades.processor;

import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;

/**
 * Uses the Kinesis Client Library (KCL) 2.2.9 to continuously consume and process stock trade
 * records from the stock trades stream. KCL monitors the number of shards and creates
 * record processor instances to read and process records from each shard. KCL also
 * load balances shards across all the instances of this processor.
 *
 */
public class StockTradesProcessor {

    private static final Log LOG = LogFactory.getLog(StockTradesProcessor.class);

    private static final Logger ROOT_LOGGER = Logger.getLogger("");
    private static final Logger PROCESSOR_LOGGER =
            Logger.getLogger("com.amazonaws.services.kinesis.samples.stocktrades.processor.StockTradeRecordProcessor");

    private static void checkUsage(String[] args) {
        if (args.length != 5) {
            System.err.println("Usage: " + StockTradesProcessor.class.getSimpleName()
                    + " <application name> <stream name> <region> <role arn> <role session name>");
            System.exit(1);
        }
    }

    /**
     * Sets the global log level to WARNING and the log level for this package to INFO,
     * so that we only see INFO messages for this processor. This is just for the purpose
     * of this tutorial, and should not be considered as best practice.
     *
     */
    private static void setLogLevels() {
        ROOT_LOGGER.setLevel(Level.WARNING);
        // Set this to INFO for logging at INFO level. Suppressed for this example as it can be noisy.
        PROCESSOR_LOGGER.setLevel(Level.WARNING);
    }
    
    private static AwsCredentialsProvider roleCredentialsProvider(String roleArn, String roleSessionName, Region region) { AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleArn) .roleSessionName(roleSessionName) .durationSeconds(900) .build(); LOG.warn("Initializing assume role request session: " + assumeRoleRequest.roleSessionName()); StsClient stsClient = StsClient.builder().region(region).build(); StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider = StsAssumeRoleCredentialsProvider .builder() .stsClient(stsClient) .refreshRequest(assumeRoleRequest) .asyncCredentialUpdateEnabled(true) .build(); LOG.warn("Initializing sts role credential provider: " + stsAssumeRoleCredentialsProvider.prefetchTime().toString()); return stsAssumeRoleCredentialsProvider; }

    public static void main(String[] args) throws Exception {
        checkUsage(args);

        setLogLevels();

        String applicationName = args[0];
        String streamName = args[1];
        Region region = Region.of(args[2]);
        String roleArn = args[3]; String roleSessionName = args[4]; 
        
        if (region == null) {
            System.err.println(args[2] + " is not a valid AWS region.");
            System.exit(1);
        }
        
        AwsCredentialsProvider awsCredentialsProvider = roleCredentialsProvider(roleArn,roleSessionName, region); KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region).credentialsProvider(awsCredentialsProvider));
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        StockTradeRecordProcessorFactory shardRecordProcessor = new StockTradeRecordProcessorFactory();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), shardRecordProcessor);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );
        int exitCode = 0;
        try {
            scheduler.run();
        } catch (Throwable t) {
            LOG.error("Caught throwable while processing data.", t);
            exitCode = 1;
        }
        System.exit(exitCode);

    }

}

Let’s review the changes made to the code to understand the key parts of how the cross-account access works.

AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleArn) .roleSessionName(roleSessionName) .durationSeconds(900) .build();

AssumeRoleRequest class is used to get the credentials to access the Kinesis data stream in Account A using the role that was created. The value of the variable assumeRoleRequest is passed to the StsAssumeRoleCredentialsProvider.

StsClient stsClient = StsClient.builder().region(region).build();
StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider = StsAssumeRoleCredentialsProvider .builder() .stsClient(stsClient) .refreshRequest(assumeRoleRequest) .asyncCredentialUpdateEnabled(true) .build();

StsAssumeRoleCredentialsProvider periodically sends an AssumeRoleRequest to the AWS STS to maintain short-lived sessions to use for authentication. Using refreshRequest, these sessions are updated asynchronously in the background as they get close to expiring. As asynchronous refresh is not set by default, we explicitly set it to true using asyncCredentialUpdateEnabled.

AwsCredentialsProvider awsCredentialsProvider = roleCredentialsProvider(roleArn,roleSessionName, region);
KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region).credentialsProvider(awsCredentialsProvider));

  • KinesisAsyncClient is the client for accessing Kinesis asynchronously. We can create an instance of KinesisAsyncClient by passing to it the credentials to access the Kinesis data stream in Account A through the assume role. The values of Kinesis, DynamoDB, and the CloudWatch client along with the stream name, application name is used to create a ConfigsBuilder instance.
  • The ConfigsBuilder instance is used to create the KCL scheduler (also known as KCL worker in KCL versions 1.x).
  • The scheduler creates a new thread for each shard (assigned to this consumer instance), which continuously loops to read records from the data stream. It then invokes the record processor instance (StockTradeRecordProcessor in this example) to process each batch of records received. This is the class which will contain your record processing logic. The Developing Custom Consumers with Shared Throughput Using KCL section of the documentation will provide more details on the working of KCL.

KCL application in python

In this section we will show you how to configure a KCL application written in Python to access a cross-account Kinesis data stream.

A.      Steps 1 and 2 from earlier remain the same and will need to be completed before moving ahead. After the IAM roles and policies have been created, log into the EC2 instance and clone the amazon-kinesis-client-python repository using the following command.

git clone https://github.com/awslabs/amazon-kinesis-client-python.git

B.      Navigate to the amazon-kinesis-client-python directory and run the following commands.

sudo yum install python-pip
sudo pip install virtualenv
virtualenv /tmp/kclpy-sample-env
source /tmp/kclpy-sample/env/bin/activate
pip install amazon_kclpy

C.      Next, navigate to amazon-kinesis-client-python/samples and open the sample.properties file. The properties file has properties such as streamName, application name, and credential information that lets you customize the configuration for your use case.

D.      We will modify the properties file to change the stream name and application name, and to add the credentials to enable access to a Kinesis data stream in a different account. You can replace the sample.properties file and replace with the following snippet. The bolded sections show the changes we have made.

# The script that abides by the multi-language protocol. This script will

# be executed by the MultiLangDaemon, which will communicate with this script

# over STDIN and STDOUT according to the multi-language protocol.

executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.

streamName = StockTradeStream

# Used by the KCL as the name of this application. Will be used as the name

# of an Amazon DynamoDB table which will store the lease and checkpoint

# information for workers with this application name

applicationName = StockTradesProcessor

# Users can change the credentials provider the KCL will use to retrieve credentials.

# The DefaultAWSCredentialsProviderChain checks several other providers, which is

# described here:

# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html

#AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

AWSCredentialsProvider = STSAssumeRoleSessionCredentialsProvider|arn:aws:iam::Account-A-AccountNumber:role/kds-stock-trade-stream-role|kinesiscrossaccount

AWSCredentialsProviderDynamoDB = DefaultAWSCredentialsProviderChain

AWSCredentialsProviderCloudWatch = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the

# KCL in any other way.

processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.

# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax

initialPositionInStream = LATEST

# The following properties are also available for configuring the KCL Worker that is created

# by the MultiLangDaemon.

# The KCL defaults to us-east-1

#regionName = us-east-1

In the above step, you will have to replace Account-A-AccountNumber with the AWS account number of the account that has the kinesis stream.

We use the STSAssumeRoleSessionCredentialsProvider class and pass to it the role created in Account A which have permissions to access the Kinesis data stream. This gives the KCL application in Account B permissions to read the Kinesis data stream in Account A. The DynamoDB lease table and the CloudWatch metrics are in Account B. Hence, we can use the DefaultAWSCredentialsProviderChain for AWSCredentialsProviderDynamoDB and AWSCredentialsProviderCloudWatch in the properties file. You can now save the sample.properties file.

E.      Next, we will change the application code to print the data read from the Kinesis data stream to standard output (STDOUT). Edit the sample_kclpy_app.py under the samples directory. You will add all your application code logic in the process_record method. This method is called for every record in the Kinesis data stream. For this blog post, we will add a single line to the method to print the records to STDOUT, as shown in Figure 5.

Figure 5. Add custom code to process_record method

Figure 5. Add custom code to process_record method

F.       Save the file, and run the following command to build the project with the changes you just made.

cd amazon-kinesis-client-python/
python setup.py install

G.     Now you are ready to run the application. To start the KCL application, run the following command from the amazon-kinesis-client-python directory.

`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties samples/sample.properties`

This will start the application. Your application is now ready to read the data from the Kinesis data stream in another account and display the contents of the stream on STDOUT. When the producer starts writing data to the Kinesis data stream in Account A, you will start seeing those results being printed.

Clean Up

Once you are done testing the cross-account access make sure you clean up your environment to avoid incurring cost. As part of the cleanup we recommend you delete the Kinesis data stream, StockTradeStream, the EC2 instances that the KCL application is running on, and the DynamoDB table that was created by the KCL application.

Conclusion

In this blog post, we discussed the techniques to configure your KCL applications written in Java and Python to access a Kinesis data stream in a different AWS account. We also provided sample code and configurations which you can modify and use in your application code to set up the cross-account access. Now you can continue to build a multi-account strategy on AWS, while being able to easily access your Kinesis data streams from applications in multiple AWS accounts.

Building well-architected serverless applications: Optimizing application costs

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-optimizing-application-costs/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

COST 1. How do you optimize your serverless application costs?

Design, implement, and optimize your application to maximize value. Asynchronous design patterns and performance practices ensure efficient resource use and directly impact the value per business transaction. By optimizing your serverless application performance and its code patterns, you can directly impact the value it provides, while making more efficient use of resources.

Serverless architectures are easier to manage in terms of correct resource allocation compared to traditional architectures. Due to its pay-per-value pricing model and scale based on demand, a serverless approach effectively reduces the capacity planning effort. As covered in the operational excellence and performance pillars, optimizing your serverless application has a direct impact on the value it produces and its cost. For general serverless optimization guidance, see the AWS re:Invent talks, “Optimizing your Serverless applications” Part 1 and Part 2, and “Serverless architectural patterns and best practices”.

Required practice: Minimize external calls and function code initialization

AWS Lambda functions may call other managed services and third-party APIs. Functions may also use application dependencies that may not be suitable for ephemeral environments. Understanding and controlling what your function accesses while it runs can have a direct impact on value provided per invocation.

Review code initialization

I explain the Lambda initialization process with cold and warm starts in “Optimizing application performance – part 1”. Lambda reports the time it takes to initialize application code in Amazon CloudWatch Logs. As Lambda functions are billed by request and duration, you can use this to track costs and performance. Consider reviewing your application code and its dependencies to improve the overall execution time to maximize value.

You can take advantage of Lambda execution environment reuse to make external calls to resources and use the results for subsequent invocations. Use TTL mechanisms inside your function handler code. This ensures that you can prevent additional external calls that incur additional execution time, while preemptively fetching data that isn’t stale.

Review third-party application deployments and permissions

When using Lambda layers or applications provisioned by AWS Serverless Application Repository, be sure to understand any associated charges that these may incur. When deploying functions packaged as container images, understand the charges for storing images in Amazon Elastic Container Registry (ECR).

Ensure that your Lambda function only has access to what its application code needs. Regularly review that your function has a predicted usage pattern so you can factor in the cost of other services, such as Amazon S3 and Amazon DynamoDB.

Required practice: Optimize logging output and its retention

Considering reviewing your application logging level. Ensure that logging output and log retention are appropriately set to your operational needs to prevent unnecessary logging and data retention. This helps you have the minimum of log retention to investigate operational and performance inquiries when necessary.

Emit and capture only what is necessary to understand and operate your component as intended.

With Lambda, any standard output statements are sent to CloudWatch Logs. Capture and emit business and operational events that are necessary to help you understand your function, its integration, and its interactions. Use a logging framework and environment variables to dynamically set a logging level. When applicable, sample debugging logs for a percentage of invocations.

In the serverless airline example used in this series, the booking service Lambda functions use Lambda Powertools as a logging framework with output structured as JSON.

Lambda Powertools is added to the Lambda functions as a shared Lambda layer in the AWS Serverless Application Model (AWS SAM) template. The layer ARN is stored in Systems Manager Parameter Store.

Parameters:
  SharedLibsLayer:
    Type: AWS::SSM::Parameter::Value<String>
    Description: Project shared libraries Lambda Layer ARN
Resources:
    ConfirmBooking:
        Type: AWS::Serverless::Function
        Properties:
            FunctionName: !Sub ServerlessAirline-ConfirmBooking-${Stage}
            Handler: confirm.lambda_handler
            CodeUri: src/confirm-booking
            Layers:
                - !Ref SharedLibsLayer
            Runtime: python3.7
…

The LOG_LEVEL and other Powertools settings are configured in the Globals section as Lambda environment variable for all functions.

Globals:
    Function:
        Environment:
            Variables:
                POWERTOOLS_SERVICE_NAME: booking
                POWERTOOLS_METRICS_NAMESPACE: ServerlessAirline
                LOG_LEVEL: INFO 

For Amazon API Gateway, there are two types of logging in CloudWatch: execution logging and access logging. Execution logs contain information that you can use to identify and troubleshoot API errors. API Gateway manages the CloudWatch Logs, creating the log groups and log streams. Access logs contain details about who accessed your API and how they accessed it. You can create your own log group or choose an existing log group that could be managed by API Gateway.

Enable access logs, and selectively review the output format and request fields that might be necessary. For more information, see “Setting up CloudWatch logging for a REST API in API Gateway”.

API Gateway logging

API Gateway logging

Enable AWS AppSync logging which uses CloudWatch to monitor and debug requests. You can configure two types of logging: request-level and field-level. For more information, see “Monitoring and Logging”.

AWS AppSync logging

AWS AppSync logging

Define and set a log retention strategy

Define a log retention strategy to satisfy your operational and business needs. Set log expiration for each CloudWatch log group as they are kept indefinitely by default.

For example, in the booking service AWS SAM template, log groups are explicitly created for each Lambda function with a parameter specifying the retention period.

Parameters:
    LogRetentionInDays:
        Type: Number
        Default: 14
        Description: CloudWatch Logs retention period
Resources:
    ConfirmBookingLogGroup:
        Type: AWS::Logs::LogGroup
        Properties:
            LogGroupName: !Sub "/aws/lambda/${ConfirmBooking}"
            RetentionInDays: !Ref LogRetentionInDays

The Serverless Application Repository application, auto-set-log-group-retention can update the retention policy for new and existing CloudWatch log groups to the specified number of days.

For log archival, you can export CloudWatch Logs to S3 and store them in Amazon S3 Glacier for more cost-effective retention. You can use CloudWatch Log subscriptions for custom processing, analysis, or loading to other systems. Lambda extensions allows you to process, filter, and route logs directly from Lambda to a destination of your choice.

Good practice: Optimize function configuration to reduce cost

Benchmark your function using a different set of memory size

For Lambda functions, memory is the capacity unit for controlling the performance and cost of a function. You can configure the amount of memory allocated to a Lambda function, between 128 MB and 10,240 MB. The amount of memory also determines the amount of virtual CPU available to a function. Benchmark your AWS Lambda functions with differing amounts of memory allocated. Adding more memory and proportional CPU may lower the duration and reduce the cost of each invocation.

In “Optimizing application performance – part 2”, I cover using AWS Lambda Power Tuning to automate the memory testing process to balances performance and cost.

Best practice: Use cost-aware usage patterns in code

Reduce the time your function runs by reducing job-polling or task coordination. This avoids overpaying for unnecessary compute time.

Decide whether your application can fit an asynchronous pattern

Avoid scenarios where your Lambda functions wait for external activities to complete. I explain the difference between synchronous and asynchronous processing in “Optimizing application performance – part 1”. You can use asynchronous processing to aggregate queues, streams, or events for more efficient processing time per invocation. This reduces wait times and latency from requesting apps and functions.

Long polling or waiting increases the costs of Lambda functions and also reduces overall account concurrency. This can impact the ability of other functions to run.

Consider using other services such as AWS Step Functions to help reduce code and coordinate asynchronous workloads. You can build workflows using state machines with long-polling, and failure handling. Step Functions also supports direct service integrations, such as DynamoDB, without having to use Lambda functions.

In the serverless airline example used in this series, Step Functions is used to orchestrate the Booking microservice. The ProcessBooking state machine handles all the necessary steps to create bookings, including payment.

Booking service state machine

Booking service state machine

To reduce costs and improves performance with CloudWatch, create custom metrics asynchronously. You can use the Embedded Metrics Format to write logs, rather than the PutMetricsData API call. I cover using the embedded metrics format in “Understanding application health” – part 1 and part 2.

For example, once a booking is made, the logs are visible in the CloudWatch console. You can select a log stream and find the custom metric as part of the structured log entry.

Custom metric structured log entry

Custom metric structured log entry

CloudWatch automatically creates metrics from these structured logs. You can create graphs and alarms based on them. For example, here is a graph based on a BookingSuccessful custom metric.

CloudWatch metrics custom graph

CloudWatch metrics custom graph

Consider asynchronous invocations and review run away functions where applicable

Take advantage of Lambda’s event-based model. Lambda functions can be triggered based on events ingested into Amazon Simple Queue Service (SQS) queues, S3 buckets, and Amazon Kinesis Data Streams. AWS manages the polling infrastructure on your behalf with no additional cost. Avoid code that polls for third-party software as a service (SaaS) providers. Rather use Amazon EventBridge to integrate with SaaS instead when possible.

Carefully consider and review recursion, and establish timeouts to prevent run away functions.

Conclusion

Design, implement, and optimize your application to maximize value. Asynchronous design patterns and performance practices ensure efficient resource use and directly impact the value per business transaction. By optimizing your serverless application performance and its code patterns, you can reduce costs while making more efficient use of resources.

In this post, I cover minimizing external calls and function code initialization. I show how to optimize logging output with the embedded metrics format, and log retention. I recap optimizing function configuration to reduce cost and highlight the benefits of asynchronous event-driven patterns.

This post wraps up the series, building well-architected serverless applications, where I cover the AWS Well-Architected Tool with the Serverless Lens . See the introduction post for links to all the blog posts.

For more serverless learning resources, visit Serverless Land.

 

Field Notes: How to Scale OpenTravel Messaging Architecture with Amazon Kinesis Data Streams

Post Syndicated from Danny Gagne original https://aws.amazon.com/blogs/architecture/field-notes-how-to-scale-opentravel-messaging-architecture-with-amazon-kinesis-data-streams/

The travel industry relies on OpenTravel messaging systems to collect and distribute travel data—like hotel inventory and pricing—to many independent ecommerce travel sites. These travel sites need immediate access to the most current hotel inventory and pricing data. This allows shoppers access to the available rooms at the right prices. Each time a room is reserved, unreserved, or there is a price change, an ordered message with minimal latency must be sent from the hotel system to the travel site.

Overview of Solution

In this blog post, we will describe the architectural considerations needed to scale a low latency FIFO messaging system built with Amazon Kinesis Data Streams.

The architecture must satisfy the following constraints:

  1. Messages must arrive in order to each destination, with respect to their hotel code.
  2. Messages are delivered to multiple destinations independently.

Kinesis Data Streams is a managed service that enables real-time processing of streaming records. The service provides ordering of records, as well as the ability to read and replay records in the same order to multiple Amazon Kinesis applications. Kinesis data stream applications can also consume data in parallel from a stream through parallelization of consumers.

We will start with an architecture that supports one hotel with one destination, and scale it to support 1,000 hotels and 100 destinations (38,051 records per second). With the OpenTravel use case, the order of messages matters only within a stream of messages from a given hotel.

Smallest scale: One hotel with one processed delivery destination. One million messages a month.

Figure 1: Architecture showing a system sending messages from 1 hotel to 1 destination (.3805 RPS)

Figure 1: Architecture showing a system sending messages from 1 hotel to 1 destination (.3805 RPS)

Full Scale: 1,000 Hotels with 100 processed delivery destination. 100 billion messages a month.

Figure 2: Architecture showing a system sending messages from 1 hotel to 1 destination (38,051 RPS)

Figure 2: Architecture showing a system sending messages from 1 hotel to 1 destination (38,051 RPS)

In the example application, OpenTravel XML message data is the record payload. The record is written into the stream with a producer. The record is read from the stream shard by the consumer and sent to several HTTP destinations.

Each Kinesis data stream shard supports writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second. Each shard supports reads up to five transactions per second, up to a maximum data read total of 2 MB per second. There is no upper quota on the number of streams you can have in an account.

Streams Shards Writes/Input limit Reads/Output limit
1 1 1 MB per second
1,000 records per second
2 MB per second
1 500 500 MB per second
500,000 records per second
1,000 MB per second

OpenTravel message

In the following OpenTravel message, there is only one field that is important to our use case: HotelCode. In OTA (OpenTravel Agency) messaging, order matters, but it only matters in the context of a given hotel, specified by the HotelCode. As we scale up our solution, we will use the HotelCode as a partition key. Each destination receives the same messages. If we are sending 100 destinations, then we will send the message 100 times. The average message size is 4 KB.

<HotelAvailNotif>
    <request>
        <POS>
            <Source>
                <RequestorID ID = "1000">
                </RequestorID>
                <BookingChannel>
                    <CompanyName Code = "ClientTravelAgency1">
                    </CompanyName>
                </BookingChannel>
            </Source>
        </POS>
   <AvailStatusMessages HotelCode="100">
     <AvailStatusMessage BookingLimit="9">
         <StatusApplicationControl Start="2013-12-20" End="2013-12-25" RatePlanCode="BAR" InvCode="APT" InvType="ROOM" Mon="true" Tue="true" Weds="true" Thur="false"  Fri="true" Sat="true" Sun="true"/>
         <LengthsOfStay ArrivalDateBased="true">
       <LengthOfStay Time="2" TimeUnit="Day" MinMaxMessageType="MinLOS"/>
       <LengthOfStay Time="8" TimeUnit="Day" MinMaxMessageType="MaxLOS"/>
         </LengthsOfStay>
         <RestrictionStatus Status="Open" SellThroughOpenIndicator="false" MinAdvancedBookingOffset="5"/>
     </AvailStatusMessage>    
    </AvailStatusMessages>
 </request>
</HotelAvailNotif>

Source: https://github.com/XML-Travelgate/xtg-content-articles-pub/blob/master/docs/hotel-push/HotelPUSH.md 

Producer and consumer message error handling

Asynchronous message processing presents challenges with error handling, because an error can be logged to the producer or consumer application logs. Short-lived errors can be resolved with a delayed retry. However, there are cases when the data is bad and the message will always cause an error; these messages should be added to a dead letter queue (DLQ).

Producer retries and consumer retries are some of reasons why records may be delivered more than once. Kinesis Data Streams does not automatically remove duplicate records. If the destination needs a strict guarantee of uniqueness, the message must have a primary key to remove duplicates when processed in the client (Handling Duplicate Records). In most cases, the destination can mitigate duplicate messages by processing the same message multiple times in a way that produces the same result (idempotence). If a destination endpoint becomes unavailable or is not performant, the consumer should back off and retry until the endpoint is available. The failure of a single destination endpoint should not impact the performance of delivery of messages to other destinations.

Messaging with one hotel and one destination

When starting at the smallest scale—one hotel and one destination—the system must support one million messages per month. This volume expects input of 4 GB of message data per month at a rate of 0.3805 records per second, and .0015 MB per second both input and output. For the system to process one hotel to one destination, it needs at least one producer, one stream, one shard, and a single consumer.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1 hotel, 1 destination
(4-KB average record size)
1 1 1 0.3805 records per second, or .0015 MB per second 0.3805 records per second, or .0015 MB per second
Maximum stream limits 1 1 1 250 (4-KB records) per second
1 MB per second (per stream)
500 (4-KB records) per second
2 MB per second (per stream)

With this design, the single shard supports writes up to 250 records per second, up to a maximum data write total of 1 MB per second. PutRecords request supports up to 500 records and each record in the request can be as large as 1 MB. Because the average message size is 4 KB, it enables 250 records per second to be written to the shard.

Figure 3: Architecture using Amazon Kinesis Data Streams (1 hotel, 1 destination)

The single shard can also support consumer reads up to five transactions per second, up to a maximum data read total of 2 MB per second. The producer writes to the shard in FIFO order. A consumer pulls from the shard and delivers to the destination in the same order as received. In this one hotel and one destination example, the consumer read capacity would be 500 records per second.

Figure 4: Records passing through the architecture

Messaging with 1,000 hotels and one destination

Next, we will maintain a single destination and scale the number of hotels from one hotel to 1,000 hotels. This scale increases the producer inputs from one million messages to one billion messages a month. This volume expects an input of 4 TB of message data per month at a rate of 380 records per second, and 1.486 MB per second.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1,000 hotels,
1 destination
(4-KB average
record size)
1 2 1 380 records per second, or
1.486 MB
per second
380 records per second, or
1.486 MB
per second
Maximum stream limits 1 2 1 500 (4-KB records) per second
2 MB per second
(per stream)
1,000 (4-KB records) per second
4 MB per second
(per stream)

The volume of incoming messages increased to 1.486 MB per second, requiring one additional shard. A shard can be split using the API to increase the overall throughput capacity. The write capacity is increased by distributing messages to the shards. We will use the HotelCode as a partition key, to maintain order of messages with respect to a given hotel. This will distribute the writes into the two shards. The messages for each HotelCode will be written into the same shard and maintain FIFO order. By using the HotelCode as a partition key, it doubles the stream write capacity to 2 MB per second.

A single consumer can read the records from each shard at 2 MB per second per shard. The consumer will iterate through each shard, processing the records in FIFO order based on the HotelCode. It then sends messages to the destination in the same order they were sent from the producer.

Messaging with 1,000 hotels and five destinations

As we scale up further, we maintain 1,000 hotels and scale the delivery to five destinations. This increases the consumer reads from one billion messages to five billion messages a month. It has the same input of 4 TB of message data per month at a rate of 380 records per second, and 1.486 MB per second. The output is now 326 GB of message data per month at a rate of 1,902.5 records per second, and 7.43 MB per second.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1,000 hotels,
5 destinations
(4-KB average
record size)
1 4 5 380 records per second, or 1.486 MB per second 1,902.5 records
per second, or
7.43 MB per second
Maximum stream capacity 1 4 5 1,000 (4-KB records) records per second
(per shard)
4 MB per second (per stream)
2,000 (4-KB records) records per second per shard with a
standard consumer
8 MB per second
(per stream)

To support this scale, we increase the number of consumers to match the destinations. A dedicated consumer for each destination allows the system to have committed capacity to each destination. If a destination fails or becomes slow, it will not impact other destination consumers.

Figure 5: Four Kinesis shards to support read throughput

We increase to four shards to support additional read throughput required for the increased destination consumers. Each destination consumer iterates through the messages in each shard in FIFO order based on the HotelCode. It then sends the messages to the assigned destination maintaining the hotel specific ordering. Like in the previous 1,000-to-1 design, the messages for each HotelCode are written into the same shard while maintaining its FIFO order.

The distribution of messages per HotelCode should be monitored with the metrics like WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded to ensure an even distribution between shards, because HotelCode is the partition key. If there is uneven distribution, it may require a dedicated shard for each HotelCode. In the following examples, it is assumed that there is an even distribution of messages.

Figure 6: Architecture showing 1,000 Hotels, 4 Shards, 5 Consumers, and 5 Destinations

Messaging with 1,000 hotels and 100 destinations

With 1,000 hotels and 100 destinations, the system processes 100 billion messages per month. It continues to have the same number of incoming messages, 4 TB of message data per month, at a rate of 380 records per second, and 1.486 MB per second. The number of destinations has increased from 4 to 100, resulting in 390 TB of message data per month, at a rate of 38,051 records per second, and 148.6 MB per second.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1,000 hotels,
100 destinations
(4-KB average
record size)
1 78 100 380 records per second, or
1.486 MB per second
38,051 records per second, or
148.6 MB per second
Maximum stream capacity 1 78 100 19,500 (4-KB records) records per second (per stream)
78 MB per second (per stream)
39,000 (4-KB records) records per second per stream with a standard consumer
156 MB per second
(per stream)

The number of shards increases from four to 78 to support the required read throughput needed for 100 consumers, to attain a read capacity of 156 MB per second with 78 shards.

 

Figure 7: Architecture with 1,000 hotels, 78 Kinesis shards, 100 consumers, and 100 destinations

Next, we will look at a different architecture using a different type of consumer called the enhanced fan-out consumer. The enhanced fan-out consumer will improve the efficiency of the stream throughput and processing efficiency.

Lambda enhanced fan-out consumers

Enhanced fan-out consumers can increase the per shard read consumption throughput through event-based consumption, reduce latency with parallelization, and support error handling. The enhanced fan-out consumer increases the read capacity of consumers from a shared 2 MB per second, to a dedicated 2 MB per second for each consumer.

When using Lambda as an enhanced fan-out consumer, you can use the Event Source Mapping Parallelization Factor to have one Lambda pull from one shard concurrently with up to 10 parallel invocations per consumer. Each parallelized invocation contains messages with the same partition key (HotelCode) and maintains order. The invocations complete each message before processing with the next parallel invocation.

Figure 8: Lambda fan-out consumers with parallel invocations, maintaining order

Consumers can gain performance improvements by setting a larger batch size for each invocation. When setting the Event Source Mapping batch size, the consumer can set the maximum number of records that the Lambda will retrieve from the stream at the time of invoking the function. The Event Source Mapping batch window can be used to adjust the time to wait to gather records for the batch before invoking the function.

The Lambda Event Source Mapping has a setting ReportBatchItemFailures that can be used to keep track of the last successfully processed record. When the next invocation of the Lambda starts the batch from the checkpoint, it starts with the failed record. This will occur until the maximum number of retries for the failed record occurs and it is expired. If this feature is enabled and a failure occurs, Lambda will prioritize checkpointing, over other set mechanisms, to minimize duplicate processing.

Lambda has built-in support for sending old or exhausted retries to an on-failure destination with the option of Amazon Simple Queue Service as a DLQ or SNS topic. The Lambda consumer can be configured with maximum record retries and maximum record age, so repeated failures are sent to a DLQ and handled with a separate Lambda function.

Figure 9: Lambda fan-out consumers with parallel invocations, and error handling

Messaging with Lambda fan-out consumers at scale

In the 1,000 hotel and 100 destination scenario, we will scale by shard and stream. Lambda fan-out consumers have a hard quota of 20 fanout-out consumers per stream. With one consumer per destination, 100 fan-out consumers will need five streams. The producer will write to one stream, which has a consumer that writes to the five streams that our destination consumers are reading from.

Hotels/Destinations Streams Shards Consumers (fan-out) Input Output
1,000 hotels,
100 destinations
(4-KB average
record size)
5 10 (2 each stream) 100 380 records
per second, or
74.3 MB per second
38,051 records
per second, or
148.6 MB per second
Maximum stream capacity 5 10 (2 each stream) 100 500 (4-KB records) per second per stream
2 MB per second per stream
40,000 (4-KB records) per second per stream
200,000 (4-KB records) per second with 5 streams
40 MB per second per stream
200 MB per second with 5 streams

 

Figure 10: Architecture 1,000 hotels, 100 destinations, multiple streams, and fan-out consumers

This architecture increases throughput to 200 MB per second, with only 12 shards, compared to 78 shards with standard consumers.

Conclusion

In this blog post, we explained how to use Kinesis Data Streams to process low latency ordered messages at scale with OpenTravel data. We reviewed the aspects of efficient processing and message consumption, scaling considerations, and error handling scenarios.  We explored multiple architectures, one dimension of scale at a time, to demonstrate several considerations for scaling the OpenTravel messaging system.

References

Building well-architected serverless applications: Optimizing application performance – part 2

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-optimizing-application-performance-part-2/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

PERF 1. Optimizing your serverless application’s performance

This post continues part 1 of this security question. Previously, I cover measuring and optimizing function startup time. I explain cold and warm starts and how to reuse the Lambda execution environment to improve performance. I show a number of ways to analyze and optimize the initialization startup time. I explain how only importing necessary libraries and dependencies increases application performance.

Good practice: Design your function to take advantage of concurrency via asynchronous and stream-based invocations

AWS Lambda functions can be invoked synchronously and asynchronously.

Favor asynchronous over synchronous request-response processing.

Consider using asynchronous event processing rather than synchronous request-response processing. You can use asynchronous processing to aggregate queues, streams, or events for more efficient processing time per invocation. This reduces wait times and latency from requesting apps and functions.

When you invoke a Lambda function with a synchronous invocation, you wait for the function to process the event and return a response.

Synchronous invocation

Synchronous invocation

As synchronous processing involves a request-response pattern, the client caller also needs to wait for a response from a downstream service. If the downstream service then needs to call another service, you end up chaining calls that can impact service reliability, in addition to response times. For example, this POST /order request must wait for the response to the POST /invoice request before responding to the client caller.

Example synchronous processing

Example synchronous processing

The more services you integrate, the longer the response time, and you can no longer sustain complex workflows using synchronous transactions.

Asynchronous processing allows you to decouple the request-response using events without waiting for a response from the function code. This allows you to perform background processing without requiring the client to wait for a response, improving client performance. You pass the event to an internal Lambda queue for processing and Lambda handles the rest. An external process, separate from the function, manages polling and retries. Using this asynchronous approach can also make it easier to handle unpredictable traffic with significant volumes.

Asynchronous invocation

Asynchronous invocation

For example, the client makes a POST /order request to the order service. The order service accepts the request and returns that it has been received, without waiting for the invoice service. The order service then makes an asynchronous POST /invoice request to the invoice service, which can then process independently of the order service. If the client must receive data from the invoice service, it can handle this separately via a GET /invoice request.

Example asynchronous processing

Example asynchronous processing

You can configure Lambda to send records of asynchronous invocations to another destination service. This helps you to troubleshoot your invocations. You can also send messages or events that can’t be processed correctly into a dedicated Amazon Simple Queue Service (SQS) dead-letter queue for investigation.

You can add triggers to a function to process data automatically. For more information on which processing model Lambda uses for triggers, see “Using AWS Lambda with other services”.

Asynchronous workflows handle a variety of use cases including data Ingestion, ETL operations, and order/request fulfillment. In these use-cases, data is processed as it arrives and is retrieved as it changes. For example asynchronous patterns, see “Serverless Data Processing” and “Serverless Event Submission with Status Updates”.

For more information on Lambda synchronous and asynchronous invocations, see the AWS re:Invent presentation “Optimizing your serverless applications”.

Tune batch size, batch window, and compress payloads for high throughput

When using Lambda to process records using Amazon Kinesis Data Streams or SQS, there are a number of tuning parameters to consider for performance.

You can configure a batch window to buffer messages or records for up to 5 minutes. You can set a limit of the maximum number of records Lambda can process by setting a batch size. Your Lambda function is invoked whichever comes first.

For high volume SQS standard queue throughput, Lambda can process up to 1000 concurrent batches of records per second. For more information, see “Using AWS Lambda with Amazon SQS”.

For high volume Kinesis Data Streams throughput, there are a number of options. Configure the ParallelizationFactor setting to process one shard of a Kinesis Data Stream with more than one Lambda invocation simultaneously. Lambda can process up to 10 batches in each shard. For more information, see “New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.” You can also add more shards to your data stream to increase the speed at which your function can process records. This increases the function concurrency at the expense of ordering per shard. For more details on using Kinesis and Lambda, see “Monitoring and troubleshooting serverless data analytics applications”.

Kinesis enhanced fan-out can maximize throughput by dedicating a 2 MB/second input/output channel per second per consumer instead of 2 MB per shard. For more information, see “Increasing stream processing performance with Enhanced Fan-Out and Lambda”.

Kinesis stream producers can also compress records. This is at the expense of additional CPU cycles for decompressing the records in your Lambda function code.

Required practice: Measure, evaluate, and select optimal capacity units

Capacity units are a unit of consumption for a service. They can include function memory size, number of stream shards, number of database reads/writes, request units, or type of API endpoint. Measure, evaluate and select capacity units to enable optimal configuration of performance, throughput, and cost.

Identify and implement optimal capacity units.

For Lambda functions, memory is the capacity unit for controlling the performance of a function. You can configure the amount of memory allocated to a Lambda function, between 128 MB and 10,240 MB. The amount of memory also determines the amount of virtual CPU available to a function. Adding more memory proportionally increases the amount of CPU, increasing the overall computational power available. If a function is CPU-, network- or memory-bound, then changing the memory setting can dramatically improve its performance.

Choosing the memory allocated to Lambda functions is an optimization process that balances performance (duration) and cost. You can manually run tests on functions by selecting different memory allocations and measuring the time taken to complete. Alternatively, use the AWS Lambda Power Tuning tool to automate the process.

The tool allows you to systematically test different memory size configurations and depending on your performance strategy – cost, performance, balanced – it identifies what is the most optimum memory size to use. For more information, see “Operating Lambda: Performance optimization – Part 2”.

AWS Lambda Power Tuning report

AWS Lambda Power Tuning report

Amazon DynamoDB manages table processing throughput using read and write capacity units. There are two different capacity modes, on-demand and provisioned.

On-demand capacity mode supports up to 40K read/write request units per second. This is recommended for unpredictable application traffic and new tables with unknown workloads. For higher and predictable throughputs, provisioned capacity mode along with DynamoDB auto scaling is recommended. For more information, see “Read/Write Capacity Mode”.

For high throughput Amazon Kinesis Data Streams with multiple consumers, consider using enhanced fan-out for dedicated 2 MB/second throughput per consumer. When possible, use Kinesis Producer Library and Kinesis Client Library for effective record aggregation and de-aggregation.

Amazon API Gateway supports multiple endpoint types. Edge-optimized APIs provide a fully managed Amazon CloudFront distribution. These are better for geographically distributed clients. API requests are routed to the nearest CloudFront Point of Presence (POP), which typically improves connection time.

Edge-optimized API Gateway deployment

Edge-optimized API Gateway deployment

Regional API endpoints are intended when clients are in the same Region. This helps you to reduce request latency and allows you to add your own content delivery network if necessary.

Regional endpoint API Gateway deployment

Regional endpoint API Gateway deployment

Private API endpoints are API endpoints that can only be accessed from your Amazon Virtual Private Cloud (VPC) using an interface VPC endpoint. For more information, see “Creating a private API in Amazon API Gateway”.

For more information on endpoint types, see “Choose an endpoint type to set up for an API Gateway API”. For more general information on API Gateway, see the AWS re:Invent presentation “I didn’t know Amazon API Gateway could do that”.

AWS Step Functions has two workflow types, standard and express. Standard Workflows have exactly once workflow execution and can run for up to one year. Express Workflows have at-least-once workflow execution and can run for up to five minutes. Consider the per-second rates you require for both execution start rate and the state transition rate. For more information, see “Standard vs. Express Workflows”.

Performance load testing is recommended at both sustained and burst rates to evaluate the effect of tuning capacity units. Use Amazon CloudWatch service dashboards to analyze key performance metrics including load testing results. I cover performance testing in more detail in “Regulating inbound request rates – part 1”.

For general serverless optimization information, see the AWS re:Invent presentation “Serverless at scale: Design patterns and optimizations”.

Conclusion

Evaluate and optimize your serverless application’s performance based on access patterns, scaling mechanisms, and native integrations. You can improve your overall experience and make more efficient use of the platform in terms of both value and resources.

This post continues from part 1 and looks at designing your function to take advantage of concurrency via asynchronous and stream-based invocations. I cover measuring, evaluating, and selecting optimal capacity units.

This well-architected question will continue in part 3 where I look at integrating with managed services directly over functions when possible. I cover optimizing access patterns and applying caching where applicable.

For more serverless learning resources, visit Serverless Land.

How MEDHOST’s cardiac risk prediction successfully leveraged AWS analytic services

Post Syndicated from Pandian Velayutham original https://aws.amazon.com/blogs/big-data/how-medhosts-cardiac-risk-prediction-successfully-leveraged-aws-analytic-services/

MEDHOST has been providing products and services to healthcare facilities of all types and sizes for over 35 years. Today, more than 1,000 healthcare facilities are partnering with MEDHOST and enhancing their patient care and operational excellence with its integrated clinical and financial EHR solutions. MEDHOST also offers a comprehensive Emergency Department Information System with business and reporting tools. Since 2013, MEDHOST’s cloud solutions have been utilizing Amazon Web Services (AWS) infrastructure, data source, and computing power to solve complex healthcare business cases.

MEDHOST can utilize the data available in the cloud to provide value-added solutions for hospitals solving complex problems, like predicting sepsis, cardiac risk, and length of stay (LOS) as well as reducing re-admission rates. This requires a solid foundation of data lake and elastic data pipeline to keep up with multi-terabyte data from thousands of hospitals. MEDHOST has invested a significant amount of time evaluating numerous vendors to determine the best solution for its data needs. Ultimately, MEDHOST designed and implemented machine learning/artificial intelligence capabilities by leveraging AWS Data Lab and an end-to-end data lake platform that enables a variety of use cases such as data warehousing for analytics and reporting.

Since you’re reading this post, you may also be interested in the following:

Getting started

MEDHOST’s initial objectives in evaluating vendors were to:

  • Build a low-cost data lake solution to provide cardiac risk prediction for patients based on health records
  • Provide an analytical solution for hospital staff to improve operational efficiency
  • Implement a proof of concept to extend to other machine learning/artificial intelligence solutions

The AWS team proposed AWS Data Lab to architect, develop, and test a solution to meet these objectives. The collaborative relationship between AWS and MEDHOST, AWS’s continuous innovation, excellent support, and technical solution architects helped MEDHOST select AWS over other vendors and products. AWS Data Lab’s well-structured engagement helped MEDHOST define clear, measurable success criteria that drove the implementation of the cardiac risk prediction and analytical solution platform. The MEDHOST team consisted of architects, builders, and subject matter experts (SMEs). By connecting MEDHOST experts directly to AWS technical experts, the MEDHOST team gained a quick understanding of industry best practices and available services allowing MEDHOST team to achieve most of the success criteria at the end of a four-day design session. MEDHOST is now in the process of moving this work from its lower to upper environment to make the solution available for its customers.

Solution

For this solution, MEDHOST and AWS built a layered pipeline consisting of ingestion, processing, storage, analytics, machine learning, and reinforcement components. The following diagram illustrates the Proof of Concept (POC) that was implemented during the four-day AWS Data Lab engagement.

Ingestion layer

The ingestion layer is responsible for moving data from hospital production databases to the landing zone of the pipeline.

The hospital data was stored in an Amazon RDS for PostgreSQL instance and moved to the landing zone of the data lake using AWS Database Migration Service (DMS). DMS made migrating databases to the cloud simple and secure. Using its ongoing replication feature, MEDHOST and AWS implemented change data capture (CDC) quickly and efficiently so MEDHOST team could spend more time focusing on the most interesting parts of the pipeline.

Processing layer

The processing layer was responsible for performing extract, tranform, load (ETL) on the data to curate them for subsequent uses.

MEDHOST used AWS Glue within its data pipeline for crawling its data layers and performing ETL tasks. The hospital data copied from RDS to Amazon S3 was cleaned, curated, enriched, denormalized, and stored in parquet format to act as the heart of the MEDHOST data lake and a single source of truth to serve any further data needs. During the four-day Data Lab, MEDHOST and AWS targeted two needs: powering MEDHOST’s data warehouse used for analytics and feeding training data to the machine learning prediction model. Even though there were multiple challenges, data curation is a critical task which requires an SME. AWS Glue’s serverless nature, along with the SME’s support during the Data Lab, made developing the required transformations cost efficient and uncomplicated. Scaling and cluster management was addressed by the service, which allowed the developers to focus on cleaning data coming from homogenous hospital sources and translating the business logic to code.

Storage layer

The storage layer provided low-cost, secure, and efficient storage infrastructure.

MEDHOST used Amazon S3 as a core component of its data lake. AWS DMS migration tasks saved data to S3 in .CSV format. Crawling data with AWS Glue made this landing zone data queryable and available for further processing. The initial AWS Glue ETL job stored the parquet formatted data to the data lake and its curated zone bucket. MEDHOST also used S3 to store the .CSV formatted data set that will be used to train, test, and validate its machine learning prediction model.

Analytics layer

The analytics layer gave MEDHOST pipeline reporting and dashboarding capabilities.

The data was in parquet format and partitioned in the curation zone bucket populated by the processing layer. This made querying with Amazon Athena or Amazon Redshift Spectrum fast and cost efficient.

From the Amazon Redshift cluster, MEDHOST created external tables that were used as staging tables for MEDHOST data warehouse and implemented an UPSERT logic to merge new data in its production tables. To showcase the reporting potential that was unlocked by the MEDHOST analytics layer, a connection was made to the Redshift cluster to Amazon QuickSight. Within minutes MEDHOST was able to create interactive analytics dashboards with filtering and drill-down capabilities such as a chart that showed the number of confirmed disease cases per US state.

Machine learning layer

The machine learning layer used MEDHOST’s existing data sets to train its cardiac risk prediction model and make it accessible via an endpoint.

Before getting into Data Lab, the MEDHOST team was not intimately familiar with machine learning. AWS Data Lab architects helped MEDHOST quickly understand concepts of machine learning and select a model appropriate for its use case. MEDHOST selected XGBoost as its model since cardiac prediction falls within regression technique. MEDHOST’s well architected data lake enabled it to quickly generate training, testing, and validation data sets using AWS Glue.

Amazon SageMaker abstracted underlying complexity of setting infrastructure for machine learning. With few clicks, MEDHOST started Jupyter notebook and coded the components leading to fitting and deploying its machine learning prediction model. Finally, MEDHOST created the endpoint for the model and ran REST calls to validate the endpoint and trained model. As a result, MEDHOST achieved the goal of predicting cardiac risk. Additionally, with Amazon QuickSight’s SageMaker integration, AWS made it easy to use SageMaker models directly in visualizations. QuickSight can call the model’s endpoint, send the input data to it, and put the inference results into the existing QuickSight data sets. This capability made it easy to display the results of the models directly in the dashboards. Read more about QuickSight’s SageMaker integration here.

Reinforcement layer

Finally, the reinforcement layer guaranteed that the results of the MEDHOST model were captured and processed to improve performance of the model.

The MEDHOST team went beyond the original goal and created an inference microservice to interact with the endpoint for prediction, enabled abstracting of the machine learning endpoint with the well-defined domain REST endpoint, and added a standard security layer to the MEDHOST application.

When there is a real-time call from the facility, the inference microservice gets inference from the SageMaker endpoint. Records containing input and inference data are fed to the data pipeline again. MEDHOST used Amazon Kinesis Data Streams to push records in real time. However, since retraining the machine learning model does not need to happen in real time, the Amazon Kinesis Data Firehose enabled MEDHOST to micro-batch records and efficiently save them to the landing zone bucket so that the data could be reprocessed.

Conclusion

Collaborating with AWS Data Lab enabled MEDHOST to:

  • Store single source of truth with low-cost storage solution (data lake)
  • Complete data pipeline for a low-cost data analytics solution
  • Create an almost production-ready code for cardiac risk prediction

The MEDHOST team learned many concepts related to data analytics and machine learning within four days. AWS Data Lab truly helped MEDHOST deliver results in an accelerated manner.


About the Authors

Pandian Velayutham is the Director of Engineering at MEDHOST. His team is responsible for delivering cloud solutions, integration and interoperability, and business analytics solutions. MEDHOST utilizes modern technology stack to provide innovative solutions to our customers. Pandian Velayutham is a technology evangelist and public cloud technology speaker.

 

 

 

 

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Building well-architected serverless applications: Building in resiliency – part 2

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-building-in-resiliency-part-2/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

Reliability question REL2: How do you build resiliency into your serverless application?

This post continues part 1 of this reliability question. Previously, I cover managing failures using retries, exponential backoff, and jitter. I explain how DLQs can isolate failed messages. I show how to use state machines to orchestrate long running transactions rather than handling these in application code.

Required practice: Manage duplicate and unwanted events

Duplicate events can occur when a request is retried or multiple consumers process the same message from a queue or stream. A duplicate can also happen when a request is sent twice at different time intervals with the same parameters. Design your applications to process multiple identical requests to have the same effect as making a single request.

Idempotency refers to the capacity of an application or component to identify repeated events and prevent duplicated, inconsistent, or lost data. This means that receiving the same event multiple times does not change the result beyond the first time the event was received. An idempotent application can, for example, handle multiple identical refund operations. The first refund operation is processed. Any further refund requests to the same customer with the same payment reference should not be processes again.

When using AWS Lambda, you can make your function idempotent. The function’s code must properly validate input events and identify if the events were processed before. For more information, see “How do I make my Lambda function idempotent?

When processing streaming data, your application must anticipate and appropriately handle processing individual records multiple times. There are two primary reasons why records may be delivered more than once to your Amazon Kinesis Data Streams application: producer retries and consumer retries. For more information, see “Handling Duplicate Records”.

Generate unique attributes to manage duplicate events at the beginning of the transaction

Create, or use an existing unique identifier at the beginning of a transaction to ensure idempotency. These identifiers are also known as idempotency tokens. A number of Lambda triggers include a unique identifier as part of the event:

You can also create your own identifiers. These can be business-specific, such as transaction ID, payment ID, or booking ID. You can use an opaque random alphanumeric string, unique correlation identifiers, or the hash of the content.

A Lambda function, for example can use these identifiers to check whether the event has been previously processed.

Depending on the final destination, duplicate events might write to the same record with the same content instead of generating a duplicate entry. This may therefore not require additional safeguards.

Use an external system to store unique transaction attributes and verify for duplicates

Lambda functions can use Amazon DynamoDB to store and track transactions and idempotency tokens to determine if the transaction has been handled previously. DynamoDB Time to Live (TTL) allows you to define a per-item timestamp to determine when an item is no longer needed. This helps to limit the storage space used. Base the TTL on the event source. For example, the message retention period for SQS.

Using DynamoDB to store idempotent tokens

Using DynamoDB to store idempotent tokens

You can also use DynamoDB conditional writes to ensure a write operation only succeeds if an item attribute meets one of more expected conditions. For example, you can use this to fail a refund operation if a payment reference has already been refunded. This signals to the application that it is a duplicate transaction. The application can then catch this exception and return the same result to the customer as if the refund was processed successfully.

Third-party APIs can also support idempotency directly. For example, Stripe allows you to add an Idempotency-Key: <key> header to the request. Stripe saves the resulting status code and body of the first request made for any given idempotency key, regardless of whether it succeeded or failed. Subsequent requests with the same key return the same result.

Validate events using a pre-defined and agreed upon schema

Implicitly trusting data from clients, external sources, or machines could lead to malformed data being processed. Use a schema to validate your event conforms to what you are expecting. Process the event using the schema within your application code or at the event source when applicable. Events not adhering to your schema should be discarded.

For API Gateway, I cover validating incoming HTTP requests against a schema in “Implementing application workload security – part 1”.

Amazon EventBridge rules match event patterns. EventBridge provides schemas for all events that are generated by AWS services. You can create or upload custom schemas or infer schemas directly from events on an event bus. You can also generate code bindings for event schemas.

SNS supports message filtering. This allows a subscriber to receive a subset of the messages sent to the topic using a filter policy. For more information, see the documentation.

JSON Schema is a tool for validating the structure of JSON documents. There are a number of implementations available.

Best practice: Consider scaling patterns at burst rates

Load testing your serverless application allows you to monitor the performance of an application before it is deployed to production. Serverless applications can be simpler to load test, thanks to the automatic scaling built into many of the services. For more information, see “How to design Serverless Applications for massive scale”.

In addition to your baseline performance, consider evaluating how your workload handles initial burst rates. This ensures that your workload can sustain burst rates while scaling to meet possibly unexpected demand.

Perform load tests using a burst strategy with random intervals of idleness

Perform load tests using a burst of requests for a short period of time. Also introduce burst delays to allow your components to recover from unexpected load. This allows you to future-proof the workload for key events when you do not know peak traffic levels.

There are a number of AWS Marketplace and AWS Partner Network (APN) solutions available for performance testing, including Gatling FrontLine, BlazeMeter, and Apica.

In regulating inbound request rates – part 1, I cover running a performance test suite using Gatling, an open source tool.

Gatling performance results

Gatling performance results

Amazon does have a network stress testing policy that defines which high volume network tests are allowed. Tests that purposefully attempt to overwhelm the target and/or infrastructure are considered distributed denial of service (DDoS) tests and are prohibited. For more information, see “Amazon EC2 Testing Policy”.

Review service account limits with combined utilization across resources

AWS accounts have default quotas, also referred to as limits, for each AWS service. These are generally Region-specific. You can request increases for some limits while other limits cannot be increased. Service Quotas is an AWS service that helps you manage your limits for many AWS services. Along with looking up the values, you can also request a limit increase from the Service Quotas console.

Service Quotas dashboard

Service Quotas dashboard

As these limits are shared within an account, review the combined utilization across resources including the following:

  • Amazon API Gateway: number of requests per second across all APIs. (link)
  • AWS AppSync: throttle rate limits. (link)
  • AWS Lambda: function concurrency reservations and pool capacity to allow other functions to scale. (link)
  • Amazon CloudFront: requests per second per distribution. (link)
  • AWS IoT Core message broker: concurrent requests per second. (link)
  • Amazon EventBridge: API requests and target invocations limit. (link)
  • Amazon Cognito: API limits. (link)
  • Amazon DynamoDB: throughput, indexes, and request rates limits. (link)

Evaluate key metrics to understand how workloads recover from bursts

There are a number of key Amazon CloudWatch metrics to evaluate and alert on to understand whether your workload recovers from bursts.

  • AWS Lambda: Duration, Errors, Throttling, ConcurrentExecutions, UnreservedConcurrentExecutions. (link)
  • Amazon API Gateway: Latency, IntegrationLatency, 5xxError, 4xxError. (link)
  • Application Load Balancer: HTTPCode_ELB_5XX_Count, RejectedConnectionCount, HTTPCode_Target_5XX_Count, UnHealthyHostCount, LambdaInternalError, LambdaUserError. (link)
  • AWS AppSync: 5XX, Latency. (link)
  • Amazon SQS: ApproximateAgeOfOldestMessage. (link)
  • Amazon Kinesis Data Streams: ReadProvisionedThroughputExceeded, WriteProvisionedThroughputExceeded, GetRecords.IteratorAgeMilliseconds, PutRecord.Success, PutRecords.Success (if using Kinesis Producer Library), GetRecords.Success. (link)
  • Amazon SNS: NumberOfNotificationsFailed, NumberOfNotificationsFilteredOut-InvalidAttributes. (link)
  • Amazon Simple Email Service (SES): Rejects, Bounces, Complaints, Rendering Failures. (link)
  • AWS Step Functions: ExecutionThrottled, ExecutionsFailed, ExecutionsTimedOut. (link)
  • Amazon EventBridge: FailedInvocations, ThrottledRules. (link)
  • Amazon S3: 5xxErrors, TotalRequestLatency. (link)
  • Amazon DynamoDB: ReadThrottleEvents, WriteThrottleEvents, SystemErrors, ThrottledRequests, UserErrors. (link)

Conclusion

This post continues from part 1 and looks at managing duplicate and unwanted events with idempotency and an event schema. I cover how to consider scaling patterns at burst rates by managing account limits and show relevant metrics to evaluate

Build resiliency into your workloads. Ensure that applications can withstand partial and intermittent failures across components that may only surface in production. In the next post in the series, I cover the performance efficiency pillar from the Well-Architected Serverless Lens.

For more serverless learning resources, visit Serverless Land.

Choosing between AWS services for streaming data workloads

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/choosing-between-aws-services-for-streaming-data-workloads/

Traditionally, streaming data can be complex to manage due to the large amounts of data arriving from many separate sources. Managing fluctuations in traffic and durably persisting messages as they arrive is a non-trivial task. Using a serverless approach, AWS provides a number of services that help manage large numbers of messages, alleviating much of the infrastructure burden.

In this blog post, I compare several AWS services and how to choose between these options in streaming data workloads.

Comparing Amazon Kinesis Data Streams with Amazon SQS queues

While you can use both services to decouple data producers and consumers, each is suited to different types of workload. Amazon SQS is primarily used as a message queue to store messages durably between distributed services. Amazon Kinesis is primarily intended to manage streaming big data.

Kinesis supports ordering of records and the ability for multiple consumers to read messages from the same stream concurrently. It also allows consumers to replay messages from up to 7 days previously. Scaling in Kinesis is based upon shards and you must reshard to scale a data stream up or down.

With SQS, consumers pull data from a queue and it’s hidden from other consumers until processed successfully (known as a visibility timeout). Once a message is processed, it’s deleted from the queue. A queue may have multiple consumers but they all receive separate batches of messages. Standard queues do not provide an ordering guarantee but scaling in SQS is automatic.

Amazon Kinesis Data Streams

Amazon SQS

Ordering guarantee Yes, by shard No for standard queues; FIFO queues support ordering by group ID.
Scaling Resharding required to provision throughput Automatic for standard queues; up to 30,000 message per second for FIFO queues (more details).
Exactly-once delivery No No for standard queues; Yes for FIFO queues.
Consumer model Multiple concurrent Single consumer
Configurable message delay No Up to 15 minutes
Ability to replay messages Yes No
Encryption Yes Yes
Payload maximum 1 MB per record 256 KB per message
Message retention period 24 hours (default) to 365 days (additional charges apply) 1 minute to 14 days. 4 days is the default
Pricing model Per shard hour plus PUT payload unit per million. Additional charges for some features No minimum; $0.24-$0.595 per million messages, depending on Region and queue type
AWS Free Tier included No Yes, 1 million messages per month – see details
Typical use cases

Real-time metrics/reporting

Real-time data analytics

Log and data feed processing

Stream processing

Application integration

Asynchronous processing

Batching messages/smoothing throughput

Integration with Kinesis Data Analytics Yes No
Integration with Kinesis Data Firehose Yes No

While some functionality of both services is similar, Kinesis is often a better fit for many streaming workloads. Kinesis has a broader range of options for ingesting large amounts of data, such as the Kinesis Producer Library and Kinesis Aggregation Library. You can also use the PutRecords API to send up to 500 records (up to a maximum 5 MiB) per request.

Additionally, it has powerful integrations not available to SQS. Amazon Kinesis Data Analytics allows you to transform and analyze streaming data with Apache Flink. You can also use streaming SQL to run windowed queries on live data in near-real time. You can also use Amazon Kinesis Data Firehose as a consumer for Amazon Kinesis Data Streams, which is also not available to SQS queues.

Choosing between Kinesis Data Streams and Kinesis Data Firehose

Both of these services are part of Kinesis but they have different capabilities and target use-cases. Kinesis Data Firehose is a fully managed service that can ingest gigabytes of data from a variety of producers. When Kinesis Data Streams is the source, it automatically scales up or down to match the volume of data. It can optionally process records as they arrive with AWS Lambda and deliver batches of records to services like Amazon S3 or Amazon Redshift. Here’s how the service compares with Kinesis Data Streams:

Kinesis Data Streams

Kinesis Data Firehose

Scaling Resharding required Automatic
Supports compression No Yes (GZIP, ZIP, and SNAPPY)
Latency ~200 ms per consumer (~70 ms if using enhanced fan-out) Seconds (depends on buffer size configuration); minimum buffer window is 60 seconds
Retention 1–365 days None
Message replay Yes No
Quotas See quotas See quotas
Ingestion capacity Determined by number of shards (1,000 records or 1 MB/s per shard) No limit if source is Kinesis Data Streams; otherwise see quota page
Producer types

AWS SDK or AWS CLI

Kinesis Producer Library

Kinesis Agent

Amazon CloudWatch

Amazon EventBridge

AWS IoT Core

AWS SDK or AWS CLI

Kinesis Producer Library

Kinesis Agent

Amazon CloudWatch

Amazon EventBridge

AWS IoT Core

Kinesis Data Streams

Number of consumers Multiple, sharing 2 MB per second per shard throughput One per delivery stream
Consumers

AWS Lambda

Kinesis Data Analytics

Kinesis Data Firehose

Kinesis Client Library

Amazon S3

Amazon Redshift

Amazon Elasticsearch Service

Third-party providers

HTTP endpoints

Pricing Hourly charge plus data volume. Some features have additional charges – see pricing Based on data volume, format conversion and VPC delivery – see pricing

The needs of your workload determine the choice between the two services. To prepare and load data into a data lake or data store, Kinesis Data Firehose is usually the better choice. If you need low latency delivery of records and the ability to replay data, choose Kinesis Data Streams.

Using Kinesis Data Firehose to prepare and load data

Kinesis Data Firehose buffers data based on two buffer hints. You can configure a time-based buffer from 1-15 minutes and a volume-based buffer from 1-128 MB. Whichever limit is reached first causes the service to flush the buffer. These are called hints because the service can adjust the settings if data delivery falls behind writing to the stream. The service raises the buffer settings dynamically to allow the service to catch up.

This is the flow of data in Kinesis Data Firehose from a data source through to a destination, including optional settings for a delivery stream:

Kinesis Dat Firehose flow

  1. The service continuously loads from the data source as it arrives.
  2. The data transformation Lambda function processes individual records and returns these to the service.
  3. Transformed records are delivered to the destination once the buffer size or buffer window is reached.
  4. Any records that could not be delivered to the destination are written to an intermediate S3 bucket.
  5. Any records that cannot be transformed by the Lambda function are written to an intermediate S3 bucket.
  6. Optionally, the original, untransformed records are written to an S3 bucket.

Data transformation using a Lambda function

The data transformation process enables you to modify the contents of individual records. Kinesis Data Firehose synchronously invokes the Lambda function with a batch of records. Your custom code modifies the records and then returns an array of transformed records.

Transformed records

The incoming payload provides the data attribute in base64 encoded format. Once the transformation is complete, the returned array must include the following attributes per record:

  • recordId: This must match the incoming recordId to enable the service to map the new data to the record.
  • result: “Ok”, “Dropped”, or “ProcessingFailed”. Dropped means that your logic has intentionally removed the record whereas ProcessingFailed indicates that an error has occurred.
  • data: The transformed data must be base64 encoded.

The returned array must be the same length as the incoming array. The Alleycat example application uses the following code in the data transformation function to add a calculated field to the record:

exports.handler = async (event) => {
    const output = event.records.map((record) => {
        
      // Extract JSON record from base64 data
      const buffer = Buffer.from(record.data, 'base64').toString()
      const jsonRecord = JSON.parse(buffer)

	// Add the calculated field
	jsonRecord.output = ((jsonRecord.cadence + 35) * (jsonRecord.resistance + 65)) / 100

	// Convert back to base64 + add a newline
	const dataBuffer = Buffer.from(JSON.stringify(jsonRecord) + '\n', 'utf8').toString('base64')

       return {
            recordId: record.recordId,
            result: 'Ok',
            data: dataBuffer
        }
    })
    
    console.log(`Output records: ${output.length}`)
    return { records: output }
}

Comparing scaling and throughput with Kinesis Data Streams and Kinesis Data Firehose

Kinesis Data Firehose manages scaling automatically. If the data source is a Kinesis Data Stream, there is no limit to the amount of data the service can ingest. If the data source is a direct put using the PutRecordBatch API, there are soft limits of up to 500,000 records per second, depending upon the Region. See the Kinesis Data Firehose quota page for more information.

Kinesis Data Firehose invokes a Lambda transformation function synchronously and scales up the function as the number of records in the stream grows. When the destination is S3, Amazon Redshift, or the Amazon Elasticsearch Service, Kinesis Data Firehose allows up to five outstanding Lambda invocations per shard. When the destination is Splunk, the quota is 10 outstanding Lambda invocations per shard.

With Kinesis Data Firehose, the buffer hints are the main controls for influencing the rate of data delivery. You can decide between more frequent delivery of small batches of message or less frequent delivery of larger batches. This can impact the PUT cost when using a destination like S3. However, this service is not intended to provide real-time data delivery due to the transformation and batching processes.

With Kinesis Data Streams, the number of shards in a stream determines the ingestion capacity. Each shard supports ingesting up to 1,000 messages or 1 MB per second of data. Unlike Kinesis Data Firehose, this service does not allow you to transform records before delivery to a consumer.

Data Streams has additional capabilities for increasing throughput and reducing the latency of data delivery. The service invokes Lambda consumers every second with a configurable batch size of messages. If the consumers are falling behind data production in the stream, you can increase the parallelization factor. By default, this is set to 1, meaning that each shard has a single instance of a Lambda function it invokes. You can increase this up to 10 so that multiple instances of the consumer function process additional batches of messages.

Increase the parallelization factor

Data Streams consumers use a pull model over HTTP to fetch batches of records, operating in serial. A stream with five standard consumers averages 200 ms of latency each, taking up to 1 second in total. You can improve the overall latency by using enhanced fan-out (EFO). EFO consumers use a push model over HTTP/2 and are independent of each other.

With EFO, all five consumers in the previous example receive batches of messages in parallel using dedicated throughput. The overall latency averages 70 ms and typically data delivery speed is improved by up to 65%. Note that there is an additional charge for this feature.

Kinesis Data Streams EFO consumers

Conclusion

This blog post compares different AWS services for handling streaming data. I compare the features of SQS and Kinesis Data Streams, showing how ordering, ingestion throughput, and multiple consumers often make Kinesis the better choice for streaming workloads.

I compare Data Streams and Kinesis Data Firehose and show how Kinesis Data Firehose is the better option for many data loading operations. I show how the data transformation process works and the overall workflow of a Kinesis Data Firehose stream. Finally, I compare the scaling and throughput options for these two services.

For more serverless learning resources, visit Serverless Land.

Building well-architected serverless applications: Building in resiliency – part 1

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-building-in-resiliency-part-1/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

Reliability question REL2: How do you build resiliency into your serverless application?

Evaluate scaling mechanisms for serverless and non-serverless resources to meet customer demand. Build resiliency into your workload to make your serverless application resilient to withstand partial and intermittent failures across components that may only surface in production.

Required practice: Manage transaction, partial, and intermittent failures

Whenever one service or system calls another, there is a chance that failures can happen. Services or systems often don’t fail as a single unit, but rather suffer partial or transient failures. Applications should be designed to handle component failures as part of the architecture. The system should be designed to detect failure and, ideally, automatically heal itself.

Transaction failures can occur when a component is unavailable or under high load. Partial failures can occur when a percentage of requests succeeds, including during batch processing. Intermittent failures might occur when a request fails for a short period of time due to network or other transient issues.

AWS serverless services, including AWS Lambda, are fault-tolerant and designed to handle failures. If a service invokes a Lambda function and there is a service disruption, Lambda invokes the function in a different Availability Zone.

When you invoke a function directly, you determine the strategy for handling errors. You can retry, send the event to a destination or queue for debugging, or ignore the error. Clients such as the AWS Command Line Interface (CLI) and the AWS SDK retry on client timeouts, throttling errors (429), and other errors that are not caused by a bad request.

When you invoke a function indirectly, you must be aware of the retry behavior of the invoker and any service that the request encounters along the way. For more information, see “Error handling and automatic retries in AWS Lambda”. You can configure Maximum Retry Attempts and Maximum Event Age for asynchronous invocations.

When reading from Amazon Kinesis Data Streams and Amazon DynamoDB Streams, Lambda retries the entire batch of items. Retries continue until the records expire or exceed the maximum age that you configure on the event source mapping. You can also configure the event source mapping to split a failed batch into two batches. Retrying with smaller batches isolates bad records and works around timeout issues.

Partial failures can occur in non-atomic operations. PutRecords for Kinesis and BatchWriteItem for DynamoDB return a successful response if at least one record is ingested successfully. Always inspect the response when using such operations and programmatically deal with partial failures.

Use exponential backoff with jitter

The simplest technique for dealing with failures in a networked environment is to retry calls until they succeed. This technique increases the reliability of the application and reduces operational costs for the developer.

However, it is not always safe to retry. A retry can further increase the load on the system being called if the system is already failing due to an overload. To avoid this problem, use backoff. Instead of retrying immediately and aggressively, the client waits some amount of time between tries. The most common pattern is an exponential backoff, which uses exponentially longer wait times between retries. This is typically capped to a maximum delay and number of retries.

If all backoff retries are still happening at the same time, this can still overload a system or cause contention. To avoid this problem, use jitter. Jitter adds some amount of randomness to the backoff to spread the retries around in time. This can help prevent large bursts by spreading out the rate when clients connect. For more information see the Amazon Builders’ Library article “Timeouts, retries, and backoff with jitter” and AWS Architecture blog post “Exponential Backoff And Jitter”.

Exponential backoff and jitter

Exponential backoff and jitter

When your application responds to callers in fail-fast scenarios and when performance is degraded, inform the caller via headers or metadata when they can retry.

Each AWS SDK implements automatic retry logic including exponential backoff. For downstream calls, you can adjust AWS and third-party SDK retries, backoffs, TCP, and HTTP timeouts. This helps you decide when to stop retrying. For more information, see the documentation and troubleshooting steps for Lambda and the AWS SDK.

Use a dead-letter queue mechanism to retain, investigate and retry failed transactions

There are a number of ways to handle message failures including destinations and dead-letter queues.

You can configure Lambda to send records of asynchronous invocations to another destination service. These include Amazon Simple Queue Service (SQS), Amazon Simple Notification Service (SNS), Lambda, and Amazon EventBridge. You can configure separate destinations for events that fail processing and events that are successfully processed. The invocation record contains details about the event, the response, and the reason that the record was sent.

The following example shows a function that sends a record of a successful invocation to an EventBridge event bus. When an event fails all processing attempts, Lambda sends an invocation record to an SQS queue. It includes the function’s response in the invocation record.

AWS Lambda destinations for asynchronous invocation

AWS Lambda destinations for asynchronous invocation

SNS, SQS, Lambda, and EventBridge support dead-letter queues (DLQs). DLQs make your applications more resilient and durable by storing messages or events that can’t be processed correctly into a dedicated SQS queue. This helps you debug your application by isolating the problematic messages to determine why their processing failed. One you have resolved the issue, re-process the failed message. For more information, see “When should I use a dead-letter queue?” There is an example serverless application to redrive the messages from an SQS DLQ back to its source SQS queue.

For Lambda, DLQs provide an alternative to a failure destination. Lambda destinations is preferable for asynchronous invocations.

Good practice: Orchestrate long-running transactions

Long-running transactions can be processed by one or multiple components. Consider implementing the saga pattern using state machines for these types of transactions.

The saga pattern coordinates transactions between multiple microservices as part of a state machine. Each service that performs a transaction publishes an event to trigger the next transaction in the saga. This continues until the transaction chain is complete. If a transaction fails, saga orchestrates a series of compensating transactions that undo the changes that were made by the preceding transactions.

This is preferable to handling complex or long-running transactions within application code. State machines prevent cascading failures and avoid tightly coupling components with orchestrating logic and business logic.

Use a state machine to visualize distributed transactions, and to separate business logic from orchestration logic.

AWS Step Functions lets you coordinate multiple AWS services into serverless workflows via state machines. Within Step Functions, you can set separate retries, backoff rates, max attempts, intervals, and timeouts. These are set for every step of your state machine using a declarative language.

In the serverless airline example used in this series, Step Functions is used to orchestrate the Booking microservice. The ProcessBooking state machine handles all the necessary steps to create bookings, including payment.

Booking service Step Functions state machine

Booking service Step Functions state machine

The state machine uses a combination of service integrations using DynamoDB, SQS, and Lambda functions to coordinate transactions and handle failures.

For example, the Reserve Booking task invokes a Lambda function. The task has retry and error handling configured as part of the task definition.

"Reserve Booking": {
	"Type": "Task",
	"Resource": "${ReserveBooking.Arn}",
	"TimeoutSeconds": 5,
	"Retry": [
		{
			"ErrorEquals": [
				"BookingReservationException"
			],
			"IntervalSeconds": 1,
			"BackoffRate": 2,
			"MaxAttempts": 2
		}
	],
	"Catch": [
		{
			"ErrorEquals": [
				"States.ALL"
			],
			"ResultPath": "$.bookingError",
			"Next": "Cancel Booking"
		}
	],
	"ResultPath": "$.bookingId",
	"Next": "Collect Payment"
},

Step Functions supports direct service integrations, including DynamoDB. The Reserve Flight task directly updates the flightTable without requiring a Lambda function.

"Reserve Flight": {
	"Type": "Task",
	"Resource": "arn:aws:states:::dynamodb:updateItem",
	"Parameters": {
		"TableName.$": "$.flightTable",
		"Key": {
			"id": {
				"S.$": "$.outboundFlightId"
			}
		},
		"UpdateExpression": "SET seatCapacity = seatCapacity - :dec",
		"ExpressionAttributeValues": {
			":dec": {
				"N": "1"
			},
			":noSeat": {
				"N": "0"
			}
		},
		"ConditionExpression": "seatCapacity > :noSeat"
	},

By default, when a state reports an error, Step Functions causes the execution to fail entirely.

Utilize dead-letter queues in response to failed state machine executions

Any state within the Step Functions workflow can encounter runtime errors. These include state machine definition issues, task failures such as Lambda function exceptions, or transient issues such as network connectivity issues. For more information, see “Error handling in Step Functions”.

Use the Step Functions service integration with SQS to send failed transactions to a DLQ as the final step. This adds a higher level of durability within your state machines.

For example, the airline Notify Failed Booking final task catches failed states from four previous steps. It sends the results to the Booking DLQ.

Booking service Step Functions DLQ

Booking service Step Functions DLQ

The message includes the output of the previous failed states for further troubleshooting.

"Booking DLQ": {
	"Type": "Task",
	"Resource": "arn:aws:states:::sqs:sendMessage",
	"Parameters": {
		"QueueUrl": "${BookingsDLQ}",
		"MessageBody.$": "$"
	},
	"ResultPath": "$.deadLetterQueue",
	"Next": "Booking Failed"
},

The Step Functions documentation has more information on calling SQS.

Conclusion

Build resiliency into your workloads. This makes sure that your application can withstand partial and intermittent failures across components that may only surface in production.

In this post, I cover managing failures using retries, exponential backoff, and jitter. I explain how DLQs can isolate failed messages. I show how to use state machines to orchestrate long running transactions rather than handling these in application code.

This well-architected question continues in part 2 where I look at managing duplicate and unwanted events with idempotency and an event schema. I cover how to consider scaling patterns at burst rates by managing account limits and show relevant metrics to evaluate.

For more serverless learning resources, visit Serverless Land.

Building well-architected serverless applications: Regulating inbound request rates – part 2

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-regulating-inbound-request-rates-part-2/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

Reliability question REL1: How do you regulate inbound request rates?

This post continues part 1 of this security question. Previously, I cover controlling inbound request rates using throttling. I go through how to use throttling to control steady-rate and burst rate requests. I show some solutions for performance testing to identify the request rates that your workload can sustain before impacting performance.

Good practice: Use, analyze, and enforce API quotas

API quotas limit the maximum number of requests a given API key can submit within a specified time interval. Metering API consumers provides a better understanding of how different consumers use your workload at sustained and burst rates at any point in time. With this information, you can determine fine-grained rate limiting for multiple quota limits. These can be done according to a group of consumer needs, and can adjust their limits on a regular basis.

Segregate API consumers steady-rate requests and their quota into multiple buckets or tiers

Amazon API Gateway usage plans allow your API consumer to access selected APIs at agreed-upon request rates and quotas. These help your consumers meet their business requirements and budget constraints. Create and attach API keys to usage plans to control access to certain API stages. I show how to create usage plans and how to associate them with API keys in “Building well-architected serverless applications: Controlling serverless API access – part 2”.

API key associated with usage plan

API key associated with usage plan

You can extract utilization data from usage plans to analyze API usage on a per-API key basis. In the example, I show how to use usage plans to see how many requests are made.

View API key usage

View API key usage

This allows you to generate billing documents and determine whether your customers need higher or lower limits. Have a mechanism to allow customers to request higher limits preemptively. When customers anticipate greater API usage, they can take action proactively.

API Gateway Lambda authorizers can dynamically associate API keys to a given request. This can be used where you do not control API consumers, or want to associate API keys based on your own criteria. For more information, see the documentation.

You can also visualize usage plans with Amazon QuickSight using enriched API Gateway access logs.

Visualize usage plans with Amazon QuickSight

Visualize usage plans with Amazon QuickSight

Define whether your API consumers are end users or machines

Understanding your API consumers helps you manage how they connect to your API. This helps you define a request access pattern strategy, which can distinguish between end users or machines.

Machine consumers make automated connections to your API, which may require a different access pattern to end users. You may decide to prioritize end user consumers to provide a better experience. Machine consumers may be able to handle request throttling automatically.

Best practice: Use mechanisms to protect non-scalable resources

Limit component throughput by enforcing how many transactions it can accept

AWS Lambda functions can scale faster than traditional resources, such as relational databases and cache systems. Protect your non-scalable resources by ensuring that components that scale quickly do not exceed the throughput of downstream systems. This can prevent system performance degrading. There are a number of ways to achieve this, either directly or via buffer mechanisms such as queues and streams.

For relational databases such as Amazon RDS, you can limit the number of connections per user, in addition to the global maximum number of connections. With Amazon RDS Proxy, your applications can pool and share database connections to improve their ability to scale.

Amazon RDS Proxy

Amazon RDS Proxy

For additional options for using RDS with Lambda, see the AWS Serverless Hero blog post “How To: Manage RDS Connections from AWS Lambda Serverless Functions”.

Cache results and only connect to, and fetch data from databases when needed. This reduces the load on the downstream database. Adjust the maximum number of connections for caching systems. Include a caching expiration mechanism to prevent serving stale records. For more information on caching implementation patterns and considerations, see “Caching Best Practices”.

Lambda provides managed scaling. When a function is first invoked, the Lambda service creates an instance of the function to process the event. This is called a cold start. After completion, the function remains available for a period of time to process subsequent events. These are called warm starts. If other events arrive while the function is busy, Lambda creates more instances of the function to handle these requests concurrently as cold starts. The following example shows 10 events processed in six concurrent requests.

Lambda concurrency

Lambda concurrency

You can control the number of concurrent function invocations to both reserve and limit the maximum concurrency your function can achieve. You can configure reserved concurrency to set the maximum number of concurrent instances for the function. This can protect downstream resources such as a database by ensuring Lambda can only scale up to the number of connections the database can support.

For example, you may have a traditional database or external API that can only support a maximum of 50 concurrent connections. You can set the maximum number of concurrent Lambda functions using the function concurrency settings. Setting the value to 50 ensures that the traditional database or external API is not overwhelmed.

Edit Lambda concurrency

Edit Lambda concurrency

You can also set the Lambda function concurrency to 0, which disables the Lambda function in the event of anomalies.

Another solution to protect downstream resources is to use an intermediate buffer. A buffer can persistently store messages in a stream or queue until a receiver processes them. This helps you control how fast messages are processed, which can protect the load on downstream resources.

Amazon Kinesis Data Streams allows you to collect and process large streams of data records in real time, and can act as a buffer. Streams consist of a set of shards that contain a sequence of data records. When using Lambda to process records, it processes one batch of records at a time from each shard.

Kinesis Data Streams control concurrency at the shard level, meaning that a single shard has a single concurrent invocation. This can reduce downstream calls to non-scalable resources such as a traditional database. Kinesis Data Streams also support batch windows up to 5 minutes and batch record sizes. These can also be used to control how frequent invocations can occur.

To learn how to manage scaling with Kinesis, see the documentation. To learn more how Lambda works with Kinesis, read the blog series “Building serverless applications with streaming data”.

Lambda and Kinesis shards

Lambda and Kinesis shards

Amazon Simple Queue Service (SQS) is a fully managed serverless message queuing service that enables you to decouple and scale microservices. You can offload tasks from one component of your application by sending them to a queue and processing them asynchronously.

SQS can act as a buffer, using a Lambda function to process the messages. Lambda polls the queue and invokes your Lambda function synchronously with an event that contains queue messages. Lambda reads messages in batches and invokes your function once for each batch. When your function successfully processes a batch, Lambda deletes its messages from the queue.

You can protect downstream resources using the Lambda concurrency controls. This limits the number of concurrent Lambda functions that pull messages off the queue. The messages persist in the queue until Lambda can process them. For more information see, “Using AWS Lambda with Amazon SQS

Lambda and SQS

Lambda and SQS

Conclusion

Regulating inbound requests helps you adapt different scaling mechanisms based on customer demand. You can achieve better throughput for your workloads and make them more reliable by controlling requests to a rate that your workload can support.

In this post, I cover using, analyzing, and enforcing API quotas using usage plans and API keys. I show mechanisms to protect non-scalable resources such as using RDS Proxy to protect downstream databases. I show how to control the number of Lambda invocations using concurrency controls to protect downstream resources. I explain how you can use streams and queues as an intermediate buffer to store messages persistently until a receiver processes them.

In the next post in the series, I cover the second reliability question from the Well-Architected Serverless Lens, building resiliency into serverless applications.

For more serverless learning resources, visit Serverless Land.

Building well-architected serverless applications: Regulating inbound request rates – part 1

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-regulating-inbound-request-rates-part-1/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

Reliability question REL1: How do you regulate inbound request rates?

Defining, analyzing, and enforcing inbound request rates helps achieve better throughput. Regulation helps you adapt different scaling mechanisms based on customer demand. By regulating inbound request rates, you can achieve better throughput, and adapt client request submissions to a request rate that your workload can support.

Required practice: Control inbound request rates using throttling

Throttle inbound request rates using steady-rate and burst rate requests

Throttling requests limits the number of requests a client can make during a certain period of time. Throttling allows you to control your API traffic. This helps your backend services maintain their performance and availability levels by limiting the number of requests to actual system throughput.

To prevent your API from being overwhelmed by too many requests, Amazon API Gateway throttles requests to your API. These limits are applied across all clients using the token bucket algorithm. API Gateway sets a limit on a steady-state rate and a burst of request submissions. The algorithm is based on an analogy of filling and emptying a bucket of tokens representing the number of available requests that can be processed.

Each API request removes a token from the bucket. The throttle rate then determines how many requests are allowed per second. The throttle burst determines how many concurrent requests are allowed. I explain the token bucket algorithm in more detail in “Building well-architected serverless applications: Controlling serverless API access – part 2

Token bucket algorithm

Token bucket algorithm

API Gateway limits the steady-state rate and burst requests per second. These are shared across all APIs per Region in an account. For further information on account-level throttling per Region, see the documentation. You can request account-level rate limit increases using the AWS Support Center. For more information, see Amazon API Gateway quotas and important notes.

You can configure your own throttling levels, within the account and Region limits to improve overall performance across all APIs in your account. This restricts the overall request submissions so that they don’t exceed the account-level throttling limits.

You can also configure per-client throttling limits. Usage plans restrict client request submissions to within specified request rates and quotas. These are applied to clients using API keys that are associated with your usage policy as a client identifier. You can add throttling levels per API route, stage, or method that are applied in a specific order.

For more information on API Gateway throttling, see the AWS re:Invent presentation “I didn’t know Amazon API Gateway could do that”.

API Gateway throttling

API Gateway throttling

You can also throttle requests by introducing a buffering layer using Amazon Kinesis Data Stream or Amazon SQS. Kinesis can limit the number of requests at the shard level while SQS can limit at the consumer level. For more information on using SQS as a buffer with Amazon Simple Notification Service (SNS), read “How To: Use SNS and SQS to Distribute and Throttle Events”.

Identify steady-rate and burst rate requests that your workload can sustain at any point in time before performance degraded

Load testing your serverless application allows you to monitor the performance of an application before it is deployed to production. Serverless applications can be simpler to load test, thanks to the automatic scaling built into many of the services. During a load test, you can identify quotas that may act as a limiting factor for the traffic you expect and take action.

Perform load testing for a sustained period of time. Gradually increase the traffic to your API to determine your steady-state rate of requests. Also use a burst strategy with no ramp up to determine the burst rates that your workload can serve without errors or performance degradation. There are a number of AWS Marketplace and AWS Partner Network (APN) solutions available for performance testing, Gatling Frontline, BlazeMeter, and Apica.

In the serverless airline example used in this series, you can run a performance test suite using Gatling, an open source tool.

To deploy the test suite, follow the instructions in the GitHub repository perf-tests directory. Uncomment the deploy.perftest line in the repository Makefile.

Perf-test makefile

Perf-test makefile

Once the file is pushed to GitHub, AWS Amplify Console rebuilds the application, and deploys an AWS CloudFormation stack. You can run the load tests locally, or use an AWS Step Functions state machine to run the setup and Gatling load test simulation.

Performance test using Step Functions

Performance test using Step Functions

The Gatling simulation script uses constantUsersPerSec and rampUsersPerSec to add users for a number of test scenarios. You can use the test to simulate load on the application. Once the tests run, it generates a downloadable report.

Gatling performance results

Gatling performance results

Artillery Community Edition is another open-source tool for testing serverless APIs. You configure the number of requests per second and overall test duration, and it uses a headless Chromium browser to run its test flows. For Artillery, the maximum number of concurrent tests is constrained by your local computing resources and network. To achieve higher throughput, you can use Serverless Artillery, which runs the Artillery package on Lambda functions. As a result, this tool can scale up to a significantly higher number of tests.

For more information on how to use Artillery, see “Load testing a web application’s serverless backend”. This runs tests against APIs in a demo application. For example, one of the tests fetches 50,000 questions per hour. This calls an API Gateway endpoint and tests whether the AWS Lambda function, which queries an Amazon DynamoDB table, can handle the load.

Artillery performance test

Artillery performance test

This is a synchronous API so the performance directly impacts the user’s experience of the application. This test shows that the median response time is 165 ms with a p95 time of 201 ms.

Performance test API results

Performance test API results

Another consideration for API load testing is whether the authentication and authorization service can handle the load. For more information on load testing Amazon Cognito and API Gateway using Step Functions, see “Using serverless to load test Amazon API Gateway with authorization”.

API load testing with authentication and authorization

API load testing with authentication and authorization

Conclusion

Regulating inbound requests helps you adapt different scaling mechanisms based on customer demand. You can achieve better throughput for your workloads and make them more reliable by controlling requests to a rate that your workload can support.

In this post, I cover controlling inbound request rates using throttling. I show how to use throttling to control steady-rate and burst rate requests. I show some solutions for performance testing to identify the request rates that your workload can sustain before performance degradation.

This well-architected question will be continued where I look at using, analyzing, and enforcing API quotas. I cover mechanisms to protect non-scalable resources.

For more serverless learning resources, visit Serverless Land.

Secure multi-tenant data ingestion pipelines with Amazon Kinesis Data Streams and Kinesis Data Analytics for Apache Flink

Post Syndicated from Abhinav Krishna Vadlapatla original https://aws.amazon.com/blogs/big-data/secure-multi-tenant-data-ingestion-pipelines-with-amazon-kinesis-data-streams-and-kinesis-data-analytics-for-apache-flink/

When designing multi-tenant streaming ingestion pipelines, there are myriad ways to design and build your streaming solution, each with its own set of trade-offs. The first decision you have to make is the strategy that determines how you choose to physically or logically separate one tenant’s data from another.

Sharing compute and storage resources helps reduce costs; however, it requires strong security measures to prevent cross-tenant data access. This strategy is known as a pool model. In contrast, a silo model helps reduce security complexity by having each tenant have its own set of isolated resources. However, this increases cost and operational overhead. A more detailed review of tenant isolation models is covered in the SaaS Storage Strategies whitepaper. In this post, we focus on the pool model to optimize for cost when supporting a multi-tenant streaming ingestion architecture.

Consider a retail industry data as a service (DaaS) company that ingests point of sale (POS) data from multiple customers and curates reports that blend sale transactions with third-party data in near-real time. The DaaS company can benefit from sharing compute and storage resources to reduce costs and stay competitive. For security, the DaaS company needs to authenticate each customer request and, to support a pool model, also needs to guarantee that data issues from one tenant don’t affect reports consumed by other customers. Similar scenarios apply to other industries that need to ingest data from semi-trusted servers. For example, in supply chain, a company could be streaming data from multiple suppliers to maintain a near-real-time status of SKUs and purchase orders. In the education industry, a third-party company could ingest data from servers at multiple schools and provide aggregated data to government agencies.

To build a multi-tenant streaming ingestion pipeline with shared resources, we walk you through an architecture that allows semi-trusted servers to use Amazon Kinesis Data Streams using the AWS IoT credentials provider feature for authentication, Amazon API Gateway as a proxy for authorization, and an Amazon Kinesis Data Analytics for Apache Flink application to aggregate and write data partitioned by the tenant in near-real time into an Amazon Simple Storage Service (Amazon S3) data lake. With this architecture, you remove the operational overhead of maintaining multiple Kinesis data streams (one per customer) and allow for cost optimization opportunities by performing better utilization of your provisioned Kinesis data stream shards.

The following architecture diagram illustrates the data ingestion pipeline.

In this architecture, authorized servers from one or multiple third-party companies send messages to an API Gateway endpoint. The endpoint puts messages into the proper partition of a shared Kinesis data stream. Finally, a Kinesis Data Analytics consumer application aggregates, compresses, and writes data into the proper partition of an S3 data lake.

The following sections describe in more detail the multi-tenant architecture considerations and implementation details for this architecture.

Authentication

First, you need to decide on the desired authentication mechanisms. To simplify onboarding new customers and eliminate the need for hardcoded credentials on customers servers, we recommend looking into the credentials provider feature of AWS IoT. Each tenant can use a provisioned x.509 certificate to securely retrieve temporary credentials and authenticate against AWS services using an AWS Identity and Access Management (IAM) role. For more information on how this works, see How to Eliminate the Need for Hardcoded AWS Credentials in Devices by Using the AWS IoT Credentials Provider.

For additional authentication mechanisms directly with API Gateway, see Controlling and managing access to a REST API in API Gateway.

Authorization

After you’re authenticated with IAM, the next step is authorization. Simply put, make sure each tenant can only write to their respective data lake partition. One of the key risks to mitigate in a multi-tenant steaming ingestion workflow is the scenario where a tenant server is compromised and it attempts to impersonate other tenants sending bogus data. To guarantee isolation of data ingest and reduce the blast radius of bad data, you could consider the following options:

  • Use a silo model and provision one Kinesis data stream per tenant – Kinesis Data Streams provides access control at the stream level. This approach provides you with complete isolation and the ability to scale your stream capacity up or down on a per-tenant basis. However, there is operational overhead in maintaining multiple streams, and optimizing for cost has limitations. Each data stream is provisioned by increments of one shard or 1 MB/sec of ingestion capacity with up to 1,000 PUT records per second. Pricing is based on shards per hour. One shard could be well beyond your tenant requirements and tenant onboarding costs could scale rapidly.
  • Use AWS IoT Core with one topic per tenant using topic filters and an AWS IoT rule to push data into a shared data streamAWS IoT Core gives access control at the topic level. Each tenant can send data to only their respective topic (for example, tenantID topic) based on their IAM credentials. We can then use an AWS IoT rule to extract the tenantID from the topic and push data into a shared data stream using tenantID as the partition key.
  • Use API Gateway as a proxy with mapping templates and a shared data stream – Kinesis Data Streams doesn’t provide access control at the data partition level. However, API Gateway provides access control at the method and path level. With API Gateway as a proxy, you can use mapping templates to programmatically fetch the tenant UUID from the path and set it as the partition key before pushing the data to Kinesis Data Streams.

Optimize for costs

The last two preceding options use a pool model and share a single Kinesis data stream to reduce operational overhead and costs. To optimize costs even further, you need to consider the pricing model of each of these services (API Gateway vs. AWS IoT Core) and three factors in your use case: the average size for each message, the rate at which the data is being ingested, and the data latency requirements.

Consider an example where you have 1,000 tenants (devices) and each produces data at the rate of one request per second with an average payload of 8 KB. AWS IoT Core is priced per million messages and per million rules. Each message is metered at 5 KB increments, so you’re charged for two messages per payload. If you have small payloads and very low latency requirements, AWS IoT Core is likely your best choice. If you can introduce some latency and buffer your messages at each tenant, then API Gateway is your best option because the pricing model for REST APIs requests is on a per-API call basis and not metered by KB. You can use the AWS Pricing Calculator to quickly decide which option offers the best price for your use case.

For example, with API Gateway, you can optimize your cost even further by reducing the number of API requests. Instead of each tenant sending 8 KB of data per second, you can send 240 KB every 30 seconds and reduce costs considerably. We can explore a sample cost calculation for API Gateway considering this scenario: average size of message: 240 KB, REST API request units per month: 2 request per minute x 60 min x 24 hrs. x 30 days = 86,400 requests x 1,000 tenants = 86,400,000.

The following sections walk you through the configuration of API Gateway and Kinesis to prevent cross-data access when you support a multi-tenant streaming ingestion pipeline architecture.

Enable API Gateway as a Kinesis Data Streams proxy

API Gateway is a fully managed service that makes it easy for developers to publish, maintain, monitor, and secure APIs at any scale. You can create an API Gateway endpoint to expose other AWS services, such as Amazon Simple Notification Service (Amazon SNS), Amazon S3, Kinesis, and even AWS Lambda. All AWS services support dedicated APIs to expose their features. However, the application protocols or programming interfaces are likely to differ from service to service. An API Gateway API with the AWS integration has the advantage of providing a consistent application protocol for your client to access different AWS services. In our use case, we use API Gateway as a proxy to Kinesis in order to handle IAM authentication and authorize clients to invoke URL paths with their unique tenant ID. API Gateway has additional features that are beneficial for multi-tenant applications, like rate limiting API calls per tenant, requests and response transformations, logging and monitoring, and more.

When you configure API Gateway with IAM resource-level permissions, you can make sure each tenant can only make requests to a unique URL path. For example, if the tenant invokes the API Gateway URL with their tenant ID in the path (for example, https://api-id.execute-api.us-east-2.amazonaws.com/{tenantId}), IAM validates that the tenant is authorized to invoke this URL only. For more details on how to set up an IAM policy to a specific API Gateway URL path, see Control access for invoking an API.

Then, to ensure no authorized customer can impersonate other tenant by sending bogus data, API Gateway extracts the tenant ID from the URL path programmatically using the API Gateway mapping template feature. API Gateway allows developers to transform payloads before passing it to backend resources using mapping templates written with JSONPath expressions. With this feature, we can extract the tenant ID from the URL and pass it as the partition key of the shared data stream. The following is a sample mapping template:

{
    "StreamName": "$input.params('stream-name')",
    "Data": "$util.base64Encode($input.json('$.Data'))",
    "PartitionKey": "$input.params('partition')"
}

In the preceding code, partition is the parameter name you specify in your API Gateway resource path. The following screenshot shows what the configuration looks like on the API Gateway console.

After messages in the data stream use the proper partition, the next step is to transform, enrich, and aggregate messages before writing them into an S3 data lake. For this workflow, we use Kinesis Data Analytics for Apache Flink to have full control of the data lake partition configuration. The following section describes the approach to ensure data is written in the proper partition.

Use Kinesis Data Analytics for Apache Flink to process and write data into an S3 data lake

After we guarantee that messages within the data stream have the right tenant ID as the partition key, we can use Kinesis Data Analytics for Apache Flink to continuously process messages in near-real time and store them in Amazon S3. Kinesis Data Analytics for Apache Flink is an easy way to transform and analyze streaming data in real time. Apache Flink is an open-source framework and engine for processing data streams. Kinesis Data Analytics reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. Because this solution is also serverless, there are no servers to manage, it scales automatically to match the volume and throughput of your incoming data, and you only pay for the resources your streaming applications consume.

In this scenario, we want to extract the partition key (tenantId) from each Kinesis data stream message, then process all messages within a time window and use the tenant ID as the file prefix of the files we write into the destination S3 bucket. In other words, we write the data into the proper tenant partition. The result writes data in files that look like the following:

s3://mybucket/year=2020/month=1/day=1/tenant=A01/part-0-0
s3://mybucket/year=2020/month=1/day=1/tenant=A02/part-0-1
s3://mybucket/year=2020/month=1/day=1/tenant=A03/part-0-3

To achieve this, we need to implement two custom classes within the Apache Flink application code.

First, we use a custom deserializer class to extract the partition key from the data stream and append it to the body of the message. We can achieve this by overriding the deserialize method of the KinesisDeserializationSchema class:

class CustomKinesisDeserializer implements  KinesisDeserializationSchema<String> {
    private static final Logger log = LogManager.getLogger(CustomKinesisDeserializer.class);
   @Override
    public String deserialize(byte[] bytes, String partitionKey, String seqNum,
                              long approxArrivalTimeStamp, String stream, String shardId) throws IOException {
        log.debug("deserialize - enter");
        String s = new String(bytes);
        JSONObject json = new JSONObject(s);
        json.put("tenantid", partitionKey);
        return json.toString();
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

Next, we use a customBucketAssignerclass to use the partition key in the body of the message (in our case, the tenant ID) as the bucket prefix:

private static final BucketAssigner<String, String> assigner = new BucketAssigner<String, String> () {

        @Override
        public String getBucketId(String element, BucketAssigner.Context context) {
            log.debug("getBucketId - enter");
            JSONObject json = new JSONObject(element);
            if (json.has("tenantid")) {
                String tenantId = json.getString("tenantid");
                return "tenantid=" + tenantId;
            }
            return "tenantid=unknown";
        }

        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
};

The following code is the full sample class for the Kinesis Data Analytics with Apache Flink application. The purpose of the sample code is to illustrate how you can obtain the partition key from the data stream and use it as your bucket prefix via the BucketAssigner class. Your implementation might require additional windowing logic to enrich, aggregate, and transform your data before writing it into an S3 bucket. In this post, we write data into a tenantId partition, but your code might require additional partition fields (such as by date). For additional code examples, see Kinesis Data Analytics for Apache Flink: Examples.

package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class S3StreamingSinkWithPartitionsJob {

    private static final Logger log = LogManager.getLogger(S3StreamingSinkWithPartitionsJob.class);
    private static String s3SinkPath;
    private static String inputStreamName;
    private static String region;

    /**
     * Custom BucketAssigner to specify the bucket path/prefix with the Kinesis Stream partitionKey.
     *
     * Sample code. Running application with debug mode with this implementation will expose data into log files
     */
    private static final BucketAssigner<String, String> assigner = new BucketAssigner<String, String> () {

        @Override
        public String getBucketId(String element, BucketAssigner.Context context) {
            log.debug("getBucketId - enter");
            JSONObject json = new JSONObject(element);
            if (json.has("tenantid")) {
                String tenantId = json.getString("tenantid");
                return "tenantid=" + tenantId;
            }
            return "tenantid=unknown";
        }

        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    };


    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) throws IOException {
        log.debug("createSourceFromStaticConfig - enter - variables: {region:" + region +
                ", inputStreamName:" + inputStreamName + "}");
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        /*
         * Implementinga custom serializer class that extends KinesisDeserializationSchema interface
         * to get additional values from partition keys.
         */
        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                new CustomKinesisDeserializer(),
                inputProperties
        ));
    }

    private static StreamingFileSink<String> createS3SinkFromStaticConfig() {
        log.debug("createS3SinkFromStaticConfig - enter - variables: { s3SinkPath:" + s3SinkPath + "}");
        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
                .withBucketAssigner(assigner)
                .build();
        return sink;
    }

    public static void main(String[] args) throws Exception {

        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
        region = consumerProperties.getProperty("Region","us-west-2");
        inputStreamName = consumerProperties.getProperty("InputStreamName");
        s3SinkPath = "s3a://" + consumerProperties.getProperty("S3SinkPath") + "/data";

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> input = createSourceFromStaticConfig(env);
        input.addSink(createS3SinkFromStaticConfig());
        env.execute("Flink S3 Streaming with Partitions Sink Job");
    }

}

/**
 * Custom deserializer to pass partitionKey from KDS into the record value. The partition key can be used
 * by the bucket assigner to leverage it as the s3 path/prefix/partition.
 *
 * Sample code. Running application with debug mode with this implementation will expose data into log files
 */

class CustomKinesisDeserializer implements  KinesisDeserializationSchema<String> {

    private static final Logger log = LogManager.getLogger(CustomKinesisDeserializer.class);

    @Override
    public String deserialize(byte[] bytes, String partitionKey, String seqNum,
                              long approxArrivalTimeStamp, String stream, String shardId) throws IOException {
        log.debug("deserialize - enter");
        String s = new String(bytes);
        JSONObject json = new JSONObject(s);
        json.put("tenantid", partitionKey);
        return json.toString();
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

}

To test and build this multi-tenant stream ingestion pipeline, you can deploy an AWS CloudFormation template in your AWS environment. The following section provides step-by-step instructions on how to deploy and test the sample template.

Deploy a sample multi-tenant streaming ingestion pipeline

AWS CloudFormation simplifies provisioning and managing infrastructure and services on AWS via JSON or .yaml templates. Follow these instructions to deploy and test the sample workflow described in this post. The instructions assume a basic understanding of AWS Cloud concepts, the AWS Management Console, and working with REST APIs.

  1. Create a destination S3 bucket.
  2. Deploy the CloudFormation template.

The template has only been tested in the us-west-2 Region, and creates IAM roles and users with limited access scope. This template doesn’t register CA certificates or implement the AWS IoT credentials provider feature for authentication. To test the pipeline, the template creates an IAM user for authentication with API Gateway. If you want to test the AWS IoT credentials provider feature with this implementation, follow the instructions in How to Eliminate the Need for Hardcoded AWS Credentials in Devices by Using the AWS IoT Credentials Provider.

  1. For Stack name¸ enter a name (for example, flinkapp).
  2. For KDAS3DestinationBucket, enter the name of the S3 bucket you created.
  3. Leave the other parameters as default.

  1. Accept all other options, including acknowledging the template will create IAM principals on your behalf.
  2. Wait until the stack shows the status CREATE_COMPLETE.

Now you can start your Kinesis Data Analytics for Apache Flink application.

  1. On the Kinesis Data Analytics console, choose Analytics applications.
  2. Select the application that starts with KinesisAnalyticsFI_*.
  3. Choose Run.

  1. Choose Run without snapshot.
  2. Wait for the application to show the status Running.

Now you can test sending messages to your API Gateway endpoint. Remember requests should be authenticated. The CloudFormation template created an IAM test user for this purpose. We recommend using a development API tool for this step. For this post, we use Postman.

  1. On the AWS CloudFormation console, navigate to the Outputs tab of your stack.
  2. Note the API Gateway endpoint (InvokeURL) and the name of the IAM test user.

  1. Create and retrieve the access key and secret key of your test user. For instructions, see Programmatic access.

AWS recommends using temporary keys when authenticating requests to AWS services. For testing purposes, we use a long-lived access key from this limited scope test user.

  1. Use your API development tool to build a POST request to your API Gateway endpoint using your IAM test user secrets.

The following screenshot shows the Authorization tab of the request using Postman.

The following screenshot shows the Body tab of the request using Postman.

  1. For the body of the request, you can use the following payload:
{
    Data: {
        "key1": "value1",
        "key2": "value2",
        "key3": "value3"
    }
}

You should get a response from the data stream that looks as follows:

{
 "EncryptionType": "KMS",
 "SequenceNumber": "49619151594519161991565402527623825830782609999622307842",
 "ShardId": "shardId-000000000000"
}

  1. Try to make a request to a different tenant by changing the path from /prod/T001 to /prod/T002.

Because the user isn’t authorized to send data to this endpoint, you get the following error message:

{
    "Message": "User: arn:aws:iam::*******4205:user/flinkapp-MultiTenantStreamTestUser-EWUSMWR0T5I5 is not authorized to perform: execute-api:Invoke on resource: arn:aws:execute-api:us-west-2:********4205:fktw02penb/prod/POST/T002"
}

  1. Browse to your destination S3 bucket.

You should be able to see a new file within your T001 tenant’s folder or partition.

  1. Download and open your file (part-*-*).

The content should look like the following data (in this scenario, we made six requests to the tenant’s API Gateway endpoint):

{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}

Clean up

After you finalize your testing, delete the CloudFormation stack and any data written into your destination S3 bucket to prevent incurring unnecessary charges.

Conclusion

Sharing resources in multi-tenant architectures allows organizations to optimize for costs while providing controls for proper tenant isolation and security. In this post, we showed you how to use API Gateway as a proxy to authorize tenants to a specific partition in your shared Kinesis data stream and prevent cross-tenant data access when performing data ingestion from semi-trusted servers. We also showed you how buffering data and sharing a single data stream with multiple tenants reduces operational overhead and optimizes for costs by taking advantage of better resource utilization. Check out the Kinesis Data Streams and Kinesis Data Analytics quick starts to evaluate them for your multi-tenant ingestion use case.


About the Authors

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. In his free time, he likes to cook and travel.

 

Pablo Redondo Sanchez is a Senior Solutions Architect at Amazon Web Services. He is a data enthusiast and works with customers to help them achieve better insights and faster outcomes from their data analytics workflows. In his spare time, Pablo enjoys woodworking and spending time outdoor with his family in Northern California.

Understanding data streaming concepts for serverless applications

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/understanding-data-streaming-concepts-for-serverless-applications/

Amazon Kinesis is a suite of managed services that can help you collect, process, and analyze streaming data in near-real time. It consists of four separate services that are designed for common tasks with streaming data: This blog post focuses on Kinesis Data Streams.

One of the main benefits of processing streaming data is that an application can react as new data is generated, instead of waiting for batches. This real-time capability enables new functionality for applications. For example, payment processors can analyze payments in real time to detect fraudulent transactions. Ecommerce websites can use streams of clickstream activity to determine site engagement metrics in near-real time.

Kinesis can be used with Amazon EC2-based and container-based workloads. However, its integration with AWS Lambda can make it a useful data source for serverless applications. Using Lambda as a stream consumer can also help minimize the amount of operational overhead for managing streaming applications.

In this post, I explain important streaming concepts and how they affect the design of serverless applications. This post references the Alleycat racing application. Alleycat is a home fitness system that allows users to compete in an intense series of 5-minute virtual bicycle races. Up to 1,000 racers at a time take the saddle and push the limits of cadence and resistance to set personal records and rank on leaderboards. The Alleycat software connects the stationary exercise bike with a backend application that processes the data from thousands of remote devices.

The Alleycat frontend allows users to configure their races and view real-time leaderboard and historical rankings. The frontend could wait until the end of each race and collect the total output from each racer. Once the batch is ready, it could rank the results and provide a leaderboard once the race is completed. However, this is not very engaging for competitors. By using streaming data instead of a batch, the application show the racers a view of who is winning during the race. This makes the virtual environment more like a real-life cycling race.

Producers and consumers

In streaming data workloads, producers are the applications that produce data and consumers are those that process it. In a serverless streaming application, a consumer is usually a Lambda function, Amazon Kinesis Data Firehose, or Amazon Kinesis Data Analytics.

Kinesis producers and consumers

There are a number of ways to put data into a Kinesis stream in serverless applications, including direct service integrations, client libraries, and the AWS SDK.

Producer

Kinesis Data Streams

Kinesis Data Firehose

Amazon CloudWatch Logs Yes, using subscription filters Yes, using subscription filters
AWS IoT Core Yes, using IoT rule actions Yes, using IoT rule actions
AWS Database Migration Service Yes – set stream as target Not directly.
Amazon API Gateway Yes, via REST API direct service integration Yes, via REST API direct service integration
AWS Amplify Yes – via JavaScript library Not directly
AWS SDK Yes Yes

A single stream may have tens of thousands of producers, which could be web or mobile applications or IoT devices. The Alleycat application uses the AWS IoT SDK for JavaScript to publish messages to an IoT topic. An IoT rule action then uses a direct integration with Kinesis Data Streams to push the data to the stream. This configuration is ideal at the device level, especially since the device may already use AWS IoT Core to receive messages.

The Alleycat simulator uses the AWS SDK to send a large number of messages to the stream. The SDK provides two methods: PutRecord and PutRecords. The first allows you to send a single record, while the second supports up to 500 records per request (or up to 5 MB in total). The simulator uses the putRecords JavaScript API to batch messages to the stream.

A producer can put records directly on a stream, for example via the AWS SDK, or indirectly via other services such as Amazon API Gateway or AWS IoT Core. If direct, the producer must have appropriate permission to write data to the stream. If indirect, the producer must have permission to invoke the proxy service, and then the service must have permission to put data onto the stream.

While there may be many producers, there are comparatively fewer consumers. You can register up to 20 consumers per data stream, which share the outgoing throughout limit per shard. Consumers receive batches of records sequentially, which means processing latency increases as you add more consumers to a stream. For latency-sensitive applications, Kinesis offers enhanced fan-out which gives consumers 2 MB per second dedicated throughput and uses a push model to reduce latency.

Shards, streams, and partition keys

A shard is a sequence of data records in a stream with a fixed capacity. Part of Kinesis billing is based upon the number of shards. A single shard can process up to 1 MB per second or 1,000 records of incoming data. One shard can also send up to 2 MB per second of outgoing data to downstream consumers. These are hard limits on the throughputs of a shard and as your application approaches these limits, you must add more shards to avoid exceeding these limits.

A stream is a collection of these shards and is often a grouping at the workload or project level. Adding another shard to a stream effectively doubles the throughput, though it also doubles the cost. When there is only one shard in a stream, all records sent to that sent are routed to the same shard. With multiple shards, the routing of incoming messages to shards is determined by a partition key.

The data producer adds the partition key before sending the record to Kinesis. The service calculates an MD5 hash of the key, which maps to one of the shards in the stream. Each shard is assigned a range of non-overlapping hash values, so each partition key maps to only one shard.

MD5 hash function

The partition key exists as an alternative to specifying a shard ID directly, since it’s common in production applications to add and remove shards depending upon traffic. How you use the partition key determines the shard-mapping behavior. For example:

  • Same value: If you specify the same string as the partition key, every message is routed to a single shard, regardless of the number of shards in the stream. This is called overheating a shard.
  • Random value: Using a pseudo-random value, such as a UUID, evenly distributes messages between all the shards available.
  • Time-based: Using a timestamp as a partition key may result in a preference for a single shard if multiple messages arrive at the same time.
  • Applicationspecific: The Alleycat application uses the raceId as a partition key to ensure that all messages from a single race are processed by the same shard consumer.

A Lambda function is a consumer application for a data stream and processes one batch of records for each shard. Since Alleycat uses a tumbling window to calculate aggregates between batches, this use of the partition key ensures that all messages for each raceId are processed by the same function. The downside to this architecture is that it is limited to 1,000 incoming messages per second with the same raceId since it is bound to a single shard.

Deciding on a partition key strategy depends upon the specific needs of your workload. In most cases, a random value partition key is often the best approach.

Streaming payloads in serverless applications

When using the SDK to put messages to a stream, the Data attribute can be a buffer, typed array, blob, or string. Combined with the partition key value, the maximum record size is 1 MB. The Data value is base64 encoded when serialized by Kinesis and delivered as an encoded value to downstream consumers. When using a Lambda function consumer, Kinesis delivers batches in a Records array. Each record contains the encoded data attribute, partition key, and additional metadata in a JSON envelope:

JSON transformation from producer to consumer

Ordering and idempotency

Records in a Kinesis stream are delivered to consuming applications in the same order that they arrive at the Kinesis service. The service assigns a sequence number to the record when it is received and this is delivered as part of the payload to a Kinesis consumer:

Sequence number in payload

When using Lambda as a consuming application for Kinesis, by default each shard has a single instance of the function processing records. In this case, ordering is guaranteed as Kinesis invokes the function serially, one batch of records at a time.

Parallelization factor of 1

You can increase the number of concurrent function invocations by setting the ParallelizationFactor on the event source mapping. This allows you to set a concurrency of between 1 and 10, which provides a way to increase Lambda throughout if the IteratorAge metric is increasing. However, one side effect is that ordering per shard is no longer guaranteed, since the shard’s messages are split into multiple subgroups based upon an internal hash.

Parallelization factor is 2

Kinesis guarantees that every record is delivered “at least once”, but occasionally messages are delivered more than once. This is caused by producers that retry messages, network-related timeouts, and consumer retries, which can occur when worker processes restart. In both cases, these are normal activities and you should design your application to handle infrequent duplicate records.

To prevent the duplicate messages causing unintentional side effects, such as charging a payment twice, it’s important to design your application with idempotency in mind. By using transaction IDs appropriately, your code can determine if a given message has been processed previously, and ignore any duplicates. In the Alleycat application, the aggregation and processes of messages is idempotent. If two identical messages are received, processing completes with the same end result.

To learn more about implementing idempotency in serverless applications, read the “Serverless Application Lens: AWS Well-Architected Framework”.

Conclusion

In this post, I introduce some of the core streaming concepts for serverless applications. I explain some of the benefits of streaming architectures and how Kinesis works with producers and consumers. I compare different ways to ingest data, how streams are composed of shards, and how partition keys determine which shard is used. Finally, I explain the payload formats at the different stages of a streaming workload, how message ordering works with shards, and why idempotency is important to handle.

To learn more about building serverless web applications, visit Serverless Land.

Monitoring and troubleshooting serverless data analytics applications

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/monitoring-and-troubleshooting-serverless-data-analytics-applications/

This series is about building serverless solutions in streaming data workloads. The application example used in this series is Alleycat, which allows bike racers to compete with each other virtually on home exercise bikes.

The first four posts have explored the architecture behind the application, which is enabled by Amazon Kinesis, Amazon DynamoDB, and AWS Lambda. This post explains how to monitor and troubleshoot issues that are common in streaming applications.

To set up the example, visit the GitHub repo and follow the instructions in the README.md file. Note that this walkthrough uses services that are not covered by the AWS Free Tier and incur cost.

Monitoring the Alleycat application

The business requirements for Alleycat state that it must handle up to 1,000 simultaneous racers. With each racer emitting a message every second, each 5-minute race results in 300,000 messages.

Reference architecture

While the architecture can support this throughput, the settings for each service determine how the workload scales up. The deployment templates in the GitHub repo do not use sufficiently high settings to handle this amount of data. In the section, I show how this results in errors and what steps you can take to resolve the issues. To start, I run the simulator for several races with the maximum racers configuration set to 1,000.

Monitoring the Kinesis stream

The monitoring tab of the Kinesis stream provides visualizations of stream metrics. This immediately shows that there is a problem in the application when running at full capacity:

Monitoring the Kinesis stream

  1. The iterator age is growing, indicating that the data consumers are falling behind the data producers. The Get records graph also shows the number of records in the stream growing.
  2. The Incoming data (count) metric shows the number of separate records ingested by the stream. The red line indicates the maximum capacity of this single-shard stream. With 1,000 active racers, this is almost at full capacity.
  3. However, the Incoming data – sum (bytes) graph shows that the total amount of data ingested by the stream is currently well under the maximum level shown by the red line.

There are two solutions for improving the capacity on the stream. First, the data producer application (the Alleycat frontend) could combine messages before sending. It’s currently reaching the total number of messages per second but the total byte capacity is significantly below the maximum. This action improves message packing but increases latency since the frontend waits to group messages.

Alternatively, you can add capacity by resharding. This enables you to increase (or decrease) the number of shards in a stream to adapt to the rate of data flowing through the application. You can do this with the UpdateShardCount API action. The existing stream goes into an Updating status and the stream scales by splitting shards. This creates two new child shards that split the partition keyspace of the parent. It also results in another, separate Lambda consumer for the new shard.

Monitoring the Lambda function

The monitoring tab of the consuming Lambda function provides visualization of metrics that can highlight problems in the workload. At full capacity, the monitoring highlights issues to resolve:

Monitoring the Lambda function

  1. The Duration chart shows that the function is exceeding its 15-second timeout, when the function normally finishes in under a second. This typically indicates that there are too many records to process in a single batch or throttling is occurring downstream.
  2. The Error count metric is growing, which highlights either logical errors in the code or errors from API calls to downstream resources.
  3. The IteratorAge metric appears for Lambda functions that are consuming from streams. In this case, the growing metric confirms that data consumption is falling behind data production in the stream.
  4. Concurrent executions remain at 1 throughout. This is set by the parallelization factor in the event source mapping and can be increased up to 10.

Monitoring the DynamoDB table

The metric tab on the application’s table in the DynamoDB console provides visualizations for the performance of the service:

Monitoring the DynamoDB table

  1. The consumed Read usage is well within the provisioned maximum and there is no read throttling on the table.
  2. Consumed Write usage, shown in blue, is frequently bursting through the provisioned capacity.
  3. The number of Write throttled requests confirms that the DynamoDB service is throttling requests since the table is over capacity.

You can resolve this issue by increasing the provisioned throughput on the table and related global secondary indexes. Write capacity units (WCUs) provide 1 KB of write throughput per second. You can set this value manually, use automatic scaling to match varying throughout, or enable on-demand mode. Read more about the pricing models for each to determine the best approach for your workload.

Monitoring Kinesis Data Streams

Kinesis Data Streams ingests data into shards, which are fixed capacity sequences of records, up to 1,000 records or 1 MB per second. There is no limit to the amount of data held within a stream but there is a configurable retention period. By default, Kinesis stores records for 24 hours but you can increase this up to 365 days as needed.

Kinesis is integrated with Amazon CloudWatch. Basic metrics are published every minute, and you can optionally enable enhanced metrics for an additional charge. In this section, I review the most commonly used metrics for monitoring the health of streams in your application.

Metrics for monitoring data producers

When data producers are throttled, they cannot put new records onto a Kinesis stream. Use the WriteProvisionedThroughputExceeded metric to detect if producers are throttled. If this is more than zero, you won’t be able to put records to the stream. Monitoring the Average for this statistic can help you determine if your producers are healthy.

When producers succeed in sending data to a stream, the PutRecord.Success and PutRecords.Success are incremented. Monitoring for spikes or drops in these metrics can help you monitor the health of producers and catch problems early. There are two separate metrics for each of the API calls, so watch the Average statistic for whichever of the two calls your application uses.

Metrics for monitoring data consumers

When data consumers are throttled or start to generate errors, Kinesis continues to accept new records from producers. However, there is growing latency between when records are written and when they are consumed for processing.

Using the GetRecords.IteratorAgeMilliseconds metric, you can measure the difference between the age of the last record consumed and the latest record put to the stream. It is important to monitor the iterator age. If the age is high in relation to the stream’s retention period, you can lose data as records expire from the stream. This value should generally not exceed 50% of the stream’s retention period – when the value reaches 100% of the stream retention period, data is lost.

If the iterator age is growing, one temporary solution is to increase the retention time of the stream. This gives you more time to resolve the issue before losing data. A more permanent solution is to add more consumers to keep up with data production, or resolve any errors that are slowing consumers.

When consumers exceed the ReadProvisionedThroughputExceeded metric, they are throttled and you cannot read from the stream. This results in a growth of records in the stream waiting for processing. Monitor the Average statistic for this metric and aim for values as close to 0 as possible.

The GetRecords.Success metric is the consumer-side equivalent of PutRecords.Success. Monitor this value for spikes or drops to ensure that your consumers are healthy. The Average is usually the most useful statistic for this purpose.

Increasing data processing throughput for Kinesis Data Streams

Adjusting the parallelization factor

Kinesis invokes Lambda consumers every second with a configurable batch size of messages. It’s important that the processing in the function keeps pace with the rate of traffic to avoid a growing iterator age. For compute intensive functions, you can increase the memory allocated in the function, which also increases the amount of virtual CPU available. This can help reduce the duration of a processing function.

If this is not possible or the function is falling behind data production in the stream, consider increasing the parallelization factor. By default, this is set to 1, meaning that each shard has a single instance of a Lambda function it invokes. You can increase this up to 10, which results in multiple instances of the consumer function processing additional batches of messages.

Adjusting the parallelization factor

Using enhanced fan-out to reduce iterator age

Standard consumers use a pull model over HTTP to fetch batches of records. Each consumer operates in serial. A stream with five consumers averages 200 ms of latency each, meaning it takes up to 1 second for all five to receive batches of records.

You can improve the overall latency by removing any unnecessary data consumers. If you use Kinesis Data Firehose and Kinesis Data Analytics on a stream, these count as consumers too. If you can remove subscribers, this helps with over data consumption throughput.

If the workload needs all of the existing subscribers, use enhanced fan-out (EFO). EFO consumers use a push model over HTTP/2 and are independent of each other. With EFO, the same five consumers in the previous example would receive batches of messages in parallel, using dedicated throughput. Overall latency averages 70 ms and typically data delivery speed is improved by up to 65%. There is an additional charge for this feature.

Enhanced fan-out

To learn more about processing streaming data with Lambda, see this AWS Online Tech Talk presentation.

Conclusion

In this post, I show how the existing settings in the Alleycat application are not sufficient for handling the expected amount of traffic. I walk through the metrics visualizations for Kinesis Data Streams, Lambda, and DynamoDB to find which quotas should be increased.

I explain which CloudWatch metrics can be used with Kinesis Data Stream to ensure that data producers and data consumers are healthy. Finally, I show how you can use the parallelization factor and enhanced fan-out features to increase the throughput of data consumers.

For more serverless learning resources, visit Serverless Land.

Building leaderboard functionality with serverless data analytics

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-serverless-applications-with-streaming-data-part-4/

This series is about building serverless solutions in streaming data workloads. The application example used in this series is Alleycat, which allows bike racers to compete with each other virtually on home exercise bikes.

Part 1 explains the application’s functionality, how to deploy to your AWS account, and provides an architectural review. Part 2 compares different ways to ingest streaming data into Amazon Kinesis Data Streams and shows how to optimize shard capacity. Part 3 uses Amazon Kinesis Data Firehose with AWS Lambda to implement the all-time rankings functionality.

This post walks through the leaderboard functionality in the application. This solution uses Kinesis Data Streams, Lambda, and Amazon DynamoDB to provide both real-time and historical rankings.

To set up the example, visit the GitHub repo and follow the instructions in the README.md file. Note that this walkthrough uses services that are not covered by the AWS Free Tier and incur cost.

Overview of leaderboards in Alleycat

Alleycat races are 5 minutes long and run continuously throughout the day for each class. Competitors select a class type and automatically join the current race by choosing Start Race. There are up to 1,000 competitors per race. To track performance, there are two types of leaderboard in Alleycat: Race results for completed races and the Realtime rankings for active races.

Alleycat frontend

  1. The user selects a completed race from the dropdown to a leaderboard ranking of results. This table is not real time and only reflects historical results.
  2. The user selects the Here now option to see live results for all competitors in the current virtual race for the chosen class.

Architecture overview

The backend microservice supporting these features is located in the 1-streaming-kds directory in the GitHub repo. It uses the following architecture:

Solution architecture

  1. The tumbling window function receives results every second from Kinesis Data Streams. This function aggregates metrics and saves the latest results to the application’s DynamoDB table.
  2. DynamoDB Streams invoke the publishing Lambda function every time an item is changed in the table. This reformats the message and publishes to the application’s IoT topic in AWS IoT Core to update the frontend.
  3. The final results function is an additional consumer on Kinesis Data Streams. It filters for only the last result in each competitor’s race and publishes the results to the DynamoDB table.
  4. The frontend calls an Amazon API Gateway endpoint to fetch the historical results for completed races via a Lambda function.

Configuring the tumbling window function

This Lambda function aggregates data from the Kinesis stream and stores the result in the DynamoDB table:

Tumbling window architecture

This function receives batches of 1,000 messages per invocation. The messages may contain results for multiple races and competitors. The function groups the results by race and racer ID and then flattens the data structure. It writes an item to the DynamoDB table in this format:

{
 "PK": "race-5403307",
 "SK": "results",
 "ts": 1620992324960,
 "results": "{\"0\":167.04,\"1\":136,\"2\":109.52,\"3\":167.14,\"4\":129.69,\"5\":164.97,\"6\":149.86,\"7\":123.6,\"8\":154.29,\"9\":89.1,\"10\":137.41,\"11\":124.8,\"12\":131.89,\"13\":117.18,\"14\":143.52,\"15\":95.04,\"16\":109.34,\"17\":157.38,\"18\":81.62,\"19\":165.76,\"20\":181.78,\"21\":140.65,\"22\":112.35,\"23\":112.1,\"24\":148.4,\"25\":141.75,\"26\":173.24,\"27\":131.72,\"28\":133.77,\"29\":118.44}",
 "GSI": 2
}

This transformation significantly reduces the number of writes to the DynamoDB table per function invocation. Each time a write occurs, this triggers the publishing process and notifies the frontend via the IoT topic.

DynamoDB can handle almost any number of writes to the table. You can set up to 40,000 write capacity units with default limits and can request even higher limits via an AWS Support ticket. However, you may want to limit the throughput to reduce the WCU cost.

The tumbling window feature of Lambda allows invocations from a streaming source to pass state between invocations. You can specify a window interval of up to 15 minutes. During the window, a state is passed from one invocation to the next, until a final invocation at the end of the window. Alleycat uses this feature to buffer aggregated results and only write the output at the end of the tumbling window:

Tumbling window process

For a tumbling window period of 5 seconds, this means that the Lambda function is invoked multiple times, passing an intermediate state from invocation to invocation. Once the window ends, it then writes the final aggregated result to DynamoDB. The tradeoff in this solution is that it reduces the number of real-time notifications to the frontend since these are published from the table’s stream. This increases the latency of live results in the frontend application.

Buffer by the Lambda function

Implementing tumbling windows in Lambda functions

The template.yaml file describes the tumbling Lambda function. The event definition specifies the tumbling window duration in the TumblingWindowInSeconds attribute:

  TumblingWindowFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: tumblingFunction/    
      Handler: app.handler
      Runtime: nodejs14.x
      Timeout: 15
      MemorySize: 256
      Environment:
        Variables:
          DDB_TABLE: !Ref DynamoDBtableName
      Policies:
        DynamoDBCrudPolicy:
          TableName: !Ref DynamoDBtableName
      Events:
        Stream:
          Type: Kinesis
          Properties:
            Stream: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStreamName}"
            BatchSize: 1000
            StartingPosition: TRIM_HORIZON      
            TumblingWindowInSeconds: 15  

When you enable tumbling windows, the function’s event payload contains several new attributes:

{
    "Records": [
        {
 	    ...
        }
    ],
    "shardId": "shardId-000000000000",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/alleycat",
    "window": {
        "start": "2021-05-05T18:51:00Z",
        "end": "2021-05-05T18:51:15Z"
    },
    "state": {},
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}

These include:

  • Window start and end: The beginning and ending timestamps for the current tumbling window.
  • State: An object containing the state returned from the previous invocation, which is initially empty in a new window. The state object can contain up to 1 MB of data.
  • isFinalInvokeForWindow: Indicates if this is the last invocation for the current window. This only occurs once per window period.
  • isWindowTerminatedEarly: A window ends early if the state exceeds the maximum allowed size of 1 MB.

The event handler in app.js uses tumbling windows if they are defined in the AWS SAM template:

// Main Lambda handler
exports.handler = async (event) => {

	// Retrieve existing state passed during tumbling window	
	let state = event.state || {}
	
	// Process the results from event
	let jsonRecords = getRecordsFromPayload(event)
	jsonRecords.map((record) => raceMap[record.raceId] = record.classId)
	state = getResultsByRaceId(state, jsonRecords)

	// If tumbling window is not configured, save and exit
	if (event.window === undefined) {
		return await saveCurrentRaces(state) 
	}

	// If tumbling window is configured, save to DynamoDB on the 
	// final invocation in the window
	if (event.isFinalInvokeForWindow) {
		await saveCurrentRaces(state) 
	} else {
		return { state }
	}
}

The final results function and API

The final results function filters for the last event in each competitor’s race, which contains the final score. The function writes each score to the DynamoDB table. Since there may be many write events per invocation, this function uses the Promise.all construct in Node.js to complete the database operations in parallel:

const saveFinalScores = async (raceResults) => {
	let paramsArr = []

	raceResults.map((result) => {
		paramsArr.push({
		  TableName : process.env.DDB_TABLE,
		  Item: {
		    PK: `race-${result.raceId}`,
		    SK: `racer-${result.racerId}`,
		    GSI: result.output,
		    ts: Date.now()
		  }
		})
	})
	// Save to DDB in parallel
	await Promise.all(paramsArr.map((params) => documentClient.put (params).promise()))
}

Using this approach, each call to documentClient.put is made without waiting for a response from the SDK. The call returns a Promise object, which is in a pending state until the database operation returns with a status. Promise.all waits for all promises to resolve or reject before code execution continues. Comparing the serial and concurrent approach, this reduces the overall time for multiple database writes. The tradeoff is that it increases the number of writes to DynamoDB and the number of WCUs consumed.

Serial vs concurrent writes

For a large number of put operations to DynamoDB, you can also use the DocumentClient’s batchWrite operation. This delegates to the underlying DynamoDB BatchWriteItem operation in the AWS SDK and can accept up to 25 separate put requests in a single call. To handle more than 25, you can make multiple batchWrite requests and still use the parallelized invocation method shown above.

The DynamoDB table maintains a list of race results with one item per racer per race:

DynamoDB table items

The frontend calls an API Gateway endpoint that invokes the getLeaderboard function. This function uses the DocumentClient’s query API to return results for a selected race, sorted by a global secondary index containing the final score:

const AWS = require('aws-sdk')
AWS.config.region = process.env.AWS_REGION 
const documentClient = new AWS.DynamoDB.DocumentClient()

// Main Lambda handler
exports.handler = async (event) => {

    const classId = parseInt(event.queryStringParameters.classId)

    const params = {
        TableName: process.env.DDB_TABLE,
        IndexName: 'GSI_PK_Index',
        KeyConditionExpression: 'PK = :ID',
        ExpressionAttributeValues: {
          ':ID': `class-${classId}`
        },
        ScanIndexForward: false,
        Limit: 1000
    }   

    const result = await documentClient.query(params).promise()   
    return result.Items
}

By default, this returns the top 1,000 places by using the Limit parameter. You can customize this or use pagination to implement fetching large result sets more efficiently.

Conclusion

In this post, I explain the all-time leaderboard logic in the Alleycat application. This is an asynchronous, eventually consistent process that checks batches of incoming records for new personal best records. This uses Kinesis Data Firehose to provide a zero-administration way to deliver and process large batches of records continuously.

This post shows the architecture in Alleycat and how this is defined in AWS SAM. Finally, I walk through how to build a data transformation Lambda function that decodes a payload and returns records back to Kinesis.

Part 5 discusses how to troubleshoot issues in Alleycat and how to monitor streaming applications generally.

For more serverless learning resources, visit Serverless Land.

Streaming Amazon DynamoDB data into a centralized data lake

Post Syndicated from Praveen Krishnamoorthy Ravikumar original https://aws.amazon.com/blogs/big-data/streaming-amazon-dynamodb-data-into-a-centralized-data-lake/

For organizations moving towards a serverless microservice approach, Amazon DynamoDB has become a preferred backend database due to its fully managed, multi-Region, multi-active durability with built-in security controls, backup and restore, and in-memory caching for internet-scale application. , which you can then use to derive near-real-time business insights. The data lake provides capabilities to business teams to plug in BI tools for analysis, and to data science teams to train models. .

This post demonstrates two common use cases of streaming a DynamoDB table into an Amazon Simple Storage Service (Amazon S3) bucket using Amazon Kinesis Data Streams, AWS Lambda, and Amazon Kinesis Data Firehose via Amazon Virtual Private Cloud (Amazon VPC) endpoints in the same AWS Region. We explore two use cases based on account configurations:

  • DynamoDB and Amazon S3 in same AWS account
  • DynamoDB and Amazon S3 in different AWS accounts

We use the following AWS services:

  • Kinesis Data Streams for DynamoDBKinesis Data Streams for DynamoDB captures item-level modifications in any DynamoDB table and replicates them to a Kinesis data stream of your choice. Your applications can access the data stream and view the item-level changes in near-real time. Streaming your DynamoDB data to a data stream enables you to continuously capture and store terabytes of data per hour. Kinesis Data Streams enables you to take advantage of longer data retention time, enhanced fan-out capability to more than two simultaneous consumer applications, and additional audit and security transparency. Kinesis Data Streams also gives you access to other Kinesis services such as Kinesis Data Firehose and Amazon Kinesis Data Analytics. This enables you to build applications to power real-time dashboards, generate alerts, implement dynamic pricing and advertising, and perform sophisticated data analytics, such as applying machine learning (ML) algorithms.
  • Lambda – Lambda lets you run code without provisioning or managing servers. It provides the capability to run code for virtually any type of application or backend service without managing servers or infrastructure. You can set up your code to automatically trigger from other AWS services or call it directly from any web or mobile app.
  • Kinesis Data Firehose – Kinesis Data Firehose helps to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon S3 and other destinations. It’s a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt your data streams before loading, which minimizes the amount of storage used and increases security.

Security is the primary focus of our use cases, so the services used in both use server-side encryption at rest and VPC endpoints for securing the data in transit.

Use case 1: DynamoDB and Amazon S3 in same AWS account

In our first use case, our DynamoDB table and S3 bucket are in the same account. We have the following resources:

  • A Kinesis data stream is configured to use 10 shards, but you can change this as needed.
  • A DynamoDB table with Kinesis streaming enabled is a source to the Kinesis data stream, which is configured as a source to a Firehose delivery stream.
  • The Firehose delivery stream is configured to use a Lambda function for record transformation along with data delivery into an S3 bucket. The Firehose delivery stream is configured to batch records for 2 minutes or 1 MiB, whichever occurs first, before delivering the data to Amazon S3. The batch window is configurable for your use case. For more information, see Configure settings.
  • The Lambda function used for this solution transforms the DynamoDB item’s multi-level JSON structure to a single-level JSON structure. It’s configured to run in a private subnet of an Amazon VPC, with no internet access. You can extend the function to support more complex business transformations.

The following diagram illustrates the architecture of the solution.

The architecture uses the DynamoDB feature to capture item-level changes in DynamoDB tables using Kinesis Data Streams. This feature provides capabilities to securely stream incremental updates without any custom code or components.

Prerequisites

To implement this architecture, you need the following:

  • An AWS account
  • Admin access to deploy the needed resources

Deploy the solution

In this step, we create a new Amazon VPC along with the rest of the components.

We also create an S3 bucket with the following features:

You can extend the template to enable additional S3 bucket features as per your requirements.

For this post, we use an AWS CloudFormation template to deploy the resources. As part of best practices, consider organizing resources by lifecycle and ownership as needed.

We use an AWS Key Management Service (AWS KMS) key for server-side encryption to encrypt the data in Kinesis Data Streams, Kinesis Data Firehose, Amazon S3, and DynamoDB.

The Amazon CloudWatch log group data is always encrypted in CloudWatch Logs. If required, you can extend this stack to encrypt log groups using KMS CMKs.

  1. Click on Launch Stack button below to create a CloudFormation :
  2. On the CloudFormation console, accept default values for the parameters.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

After stack creation is complete, note the value of the BucketName output variable from the stack’s Outputs tab. This is the S3 bucket name that is created as part of the stack. We use this value later to test the solution.

Test the solution

To test the solution, we insert a new item and then update the item in the DynamoDB table using AWS CloudShell and the AWS Command Line Interface (AWS CLI). We will also use the AWS Management Console to monitor and verify the solution.

  1. On the CloudShell console, verify that you’re in the same Region as the DynamoDB table (the default is us-east-1).
  2. Enter the following AWS CLI command to insert an item:
    aws dynamodb put-item \ 
    --table-name blog-srsa-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Adam"} , "Designation": {"S": "Architect"} }' \ 
    --return-consumed-capacity TOTAL

  3. Enter the following command to update the item: We are updating the Designation from “Architect” to ” Senior Architect
    aws dynamodb put-item \ 
    --table-name blog-srsa-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Adam"} , "Designation": {"S": "Senior Architect"} }' \ 
    --return-consumed-capacity TOTAL

All item-level modifications from the DynamoDB table are sent to a Kinesis data stream (blog-srsa-ddb-table-data-stream), which delivers the data to a Firehose delivery stream (blog-srsa-ddb-table-delivery-stream).

You can monitor the processing of updated records in the Firehose delivery stream on the Monitoring tab of the delivery stream.

You can verify the delivery of the updates to the data lake by checking the objects in the S3 bucket (BucketName value from the stack Outputs tab).

The Firehose delivery stream is configured to write records to Amazon S3 using a custom prefix which is based on the date the records are delivered to the delivery stream. This partitions the delivered records by date which helps improve query performance by limiting the amount of data that query engines need to scan in order to return the results for a specific query. For more information, see Custom Prefixes for Amazon S3 Objects.

The file is in JSON format. You can verify the data in the following ways:

Use case 2: DynamoDB and Amazon S3 in different AWS accounts

The solution for this use case uses two CloudFormation stacks: the producer stack (deployed in Account A) and the consumer stack (deployed in Account B).

The producer stack (Account A) deploys the following:

  • A Kinesis data stream is configured to use 10 shards, but you can change this as needed.
  • A DynamoDB table with Kinesis streaming is enabled as a source to the Kinesis data stream, and the data stream is configured as a source to a Firehose delivery stream.
  • The Firehose delivery stream is configured to use a Lambda function for record transformation along with data delivery into an S3 bucket in Account B. The delivery stream is configured to batch records for 2 minutes or 1 MiB, whichever occurs first, before delivering the data to Amazon S3. The batch window is configurable for your use case.
  • The Lambda function is configured to run in a private subnet of an Amazon VPC, with no internet access. For this solution, the function transforms the multi-level JSON structure to a single-level JSON structure. You can extend the function to support more complex business transformations.

The consumer stack (Account B) deploys an S3 bucket configured to receive the data from the Firehose delivery stream in Account A.

The following diagram illustrates the architecture of the solution.

The architecture uses the DynamoDB feature to capture item-level changes in DynamoDB tables using Kinesis Data Streams. This feature provides capabilities to securely stream incremental updates without any custom code or components. 

Prerequisites

For this use case, you need the following:

  • Two AWS accounts (for the producer and consumer)
    • If you already deployed the architecture for the first use case and want to use the same account, delete the stack from the previous use case before proceeding with this section
  • Admin access to deploy needed resources

Deploy the components in Account B (consumer)

This step creates an S3 bucket with the following features:

  • Encryption at rest using CMKs
  • Block Public Access
  • Bucket versioning

You can extend the template to enable additional S3 bucket features as needed.

We deploy the resources with a CloudFormation template. As part of best practices, consider organizing resources by lifecycle and ownership as needed.

We use the KMS key for server-side encryption to encrypt the data in Amazon S3.

The CloudWatch log group data is always encrypted in CloudWatch Logs. If required, you can extend the stack to encrypt log group data using KMS CMKs.

  1. Choose Launch Stack to create a CloudFormation stack in your account:
  2. For DDBProducerAccountID, enter Account A’s account ID.
  3. For KMSKeyAlias, the KMS key used for server-side encryption to encrypt the data in Amazon S3 is populated by default.
  4. Choose Create stack.

After stack creation is complete, note the value of the BucketName output variable. We use this value later to test the solution.

Deploy the components in Account A (producer)

In this step, we sign in to the AWS Management Console with Account A to deploy the producer stack. We use the KMS key for server-side encryption to encrypt the data in Kinesis Data Streams, Kinesis Data Firehose, Amazon S3, and DynamoDB. As with other stacks, the CloudWatch log group data is always encrypted in CloudWatch Logs, but you can extend the stack to encrypt log group data using KMS CMKs.

  1. Choose Launch Stack to create a CloudFormation stack in your account:
  2. For ConsumerAccountID, enter the ID of Account B.
  3. For CrossAccountDatalakeBucket, enter the bucket name for Account B, which you created in the previous step.
  4. For ArtifactBucket, the S3 bucket containing the artifacts required for deployment is populated by default.
  5. For KMSKeyAlias, the KMS key used for server-side encryption to encrypt the data in Amazon S3 is populated by default.
  6. For BlogTransformationLambdaFile, the Amazon S3 key for the Lambda function code to perform Amazon Firehose Data transformation is populated by default.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

Test the solution

To test the solution, we sign in as Account A, insert a new item in the DynamoDB table, and then update that item. Make sure you’re in the same Region as your table.

  1. On the CloudShell console, enter the following AWS CLI command to insert an item:
    aws dynamodb put-item \ 
    --table-name blog-srca-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Chris"} , "Designation": {"S": "Senior Consultant"} }' \ 
    --return-consumed-capacity TOTAL

  2. Update the existing item with the following code:
    aws dynamodb put-item \ 
    --table-name blog-srca-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Chris"} , "Designation": {"S": "Principal Consultant"} }' \ 
    --return-consumed-capacity TOTAL

  3. Sign out of Account A and sign in as Account B to verify the delivery of records into the data lake.

All item-level modifications from an DynamoDB table are sent to a Kinesis data stream (blog-srca-ddb-table-data-stream), which delivers the data to a Firehose delivery stream (blog-srca-ddb-table-delivery-stream) in Account A.

You can monitor the processing of the updated records on the Monitoring tab of the Firehose delivery stream.

You can verify the delivery of updates to the data lake by checking the objects in the S3 bucket that you created in Account B.

The Firehose delivery stream is configured similarly to the previous use case.

You can verify the data (in JSON format) in the same ways:

  • Download the files
  • Run an AWS Glue crawler to create a table to query in Athena
  • Query the data using Amazon S3 Select

Clean up

To avoid incurring future charges, clean up all the AWS resources that you created using AWS CloudFormation. You can delete these resources on the console or via the AWS CLI. For this post, we walk through the steps using the console.

Clean up resources from use case 1

To clean up the DynamoDB and Amazon S3 resources in the same account, complete the following steps:

  1. On the Amazon S3 console, empty the S3 bucket and remove any previous versions of S3 objects.
  2. On the AWS CloudFormation console, delete the stack bdb1040-ddb-lake-single-account-stack.

You must delete the Amazon S3 resources before deleting the stack, or the deletion fails.

Clean up resources from use case 2

To clean up the DynamoDB and Amazon S3 resources in different accounts, complete the following steps:

  1. Sign in to Account A.
  2. On the AWS CloudFormation console, delete the stack bdb1040-ddb-lake-multi-account-stack.
  3. Sign in to Account B.
  4. On the Amazon S3 console, empty the S3 bucket and remove any pervious versions of S3 objects.
  5. On the AWS CloudFormation console, delete the stack bdb1040-ddb-lake-multi-account-stack.

Extend the solution

You can extend this solution to stream DynamoDB table data into cross-Region S3 buckets by setting up cross-Region replication (using the Amazon secured private channel) on the bucket where Kinesis Data Firehose delivers the data.

You can also perform a point-in-time initial load of the DynamoDB table into the data lake before setting up DynamoDB Kinesis streams. DynamoDB provides a no-coding required feature to achieve this. For more information, see Export Amazon DynamoDB Table Data to Your Data Lake in Amazon S3, No Code Writing Required.

To extend the usability scope of DynamoDB data in S3 buckets, you can crawl the location to create AWS Glue Data Catalog database tables. Registering the locations with AWS Lake Formation helps simplify permission management and allows you to implement fine-grained access control. You can also use Athena, Amazon Redshift, Amazon SageMaker, and Amazon QuickSight for data analysis, ML, and reporting services.

Conclusion

In this post, we demonstrated two solutions for streaming DynamoDB table data into Amazon S3 to build a data lake using a secured Amazon private channel.

The CloudFormation template gives you an easy way to set up the process, which you can be further modify to meet your specific use case needs.

Please let us know if you have comments about this post!


About the Authors

Praveen Krishnamoorthy Ravikumar is a Data Architect with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics platform and solutions.

 

 

 

Abhishek Gupta is a Data and ML Engineer with AWS Professional Services Practice.

 

 

 

 

Ashok Yoganand Sridharan is a Big Data Consultant with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics platform and solutions

 

 

 

Building serverless applications with streaming data: Part 3

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-serverless-applications-with-streaming-data-part-3/

This series is about building serverless solutions in streaming data workloads. These are traditionally challenging to build, since data can be streamed from thousands or even millions of devices continuously. The application example used in this series is Alleycat, which allows bike racers to compete with each other virtually on home exercise bikes.

Part 1 explains the application’s functionality, how to deploy to your AWS account, and provides an architectural review. Part 2 compares different ways to ingest streaming data into Amazon Kinesis Data Streams and shows how to optimize shard capacity.

In this post, I explain how the all-time rankings functionality is implemented. This uses Amazon Kinesis Data Firehose to deliver hundreds of thousands of data points for processing by AWS Lambda functions.

To set up the example, visit the GitHub repo and follow the instructions in the README.md file. Note that this walkthrough uses services that are not covered by the AWS Free Tier and incur cost.

Overview of real time rankings in Alleycat

In the example scenario, there are 40,000 users and up to 1,000 competitors may race at any given time. While a competitor is racing, there is a real time rankings display that shows their performance in the selected class:

Alleycat front end

The racer can select Here now to compare against racers in the current virtual race. Alternatively, selecting All time compares their performance against the best performance of all races who have ever competed in the race. The rankings board is dynamic and shows the rankings for the current second on the display for the local racer:

Leaderboard changes over time

In Alleycat, races occur every five minutes continuously, and the all-time data is gathered from races that are completed. For this to work, the application must do the following:

  • Continuously aggregate race data from each of the six exercise classes and deliver to an Amazon S3 bucket.
  • Compare the incoming race data with the personal best records for every competitor in the same class.
  • If the current performance is a record, update the all-time best records dataset.
  • Compress the dataset, since there are thousands of racers, each with 5 minutes of personal racing history.

The resulting dataset contains second-by-second personal records for every racer in a selected race. This is saved in the application’s history bucket in S3 and loaded by the Alleycat front end before the beginning of each race:

        // From Home.vue's loadRealtimeHistory method

        // Load gz from S3 history bucket, unzip and parse to JSON
        const URL = `https://${this.$appConfig.historyBucket}.s3.${this.$appConfig.region}.amazonaws.com/class-${this.selectedClassId}.gz`
        const response = await axios.get(URL, { responseType: 'arraybuffer' })
        const buffer = Buffer.from(response.data, 'base64')
        const dezipped = await gunzip(buffer)
        const history = JSON.parse(dezipped.toString())

Architecture overview

The backend microservice handling this feature is located in the 2-streaming-kdf directory in the GitHub repo. It uses the following architecture:

Solution architecture

  1. Amazon Kinesis Data Firehose is a consumer of Kinesis Data Streams. Records are delivered from the application’s main stream as they arrive.
  2. Kinesis Data Firehose invokes a data transformation Lambda function. This calculates the output for each racer’s data points and returns the modified records to Kinesis.
  3. Kinesis Data Firehose delivers batches of records to an S3 bucket.
  4. When objects are written to the S3 bucket, this event triggers the S3 processor Lambda function. This compares incoming racer performance with historical records.
  5. The Lambda function saves the new all-time records in a compressed format to the application’s history bucket in S3.

The process happens continuously providing that new records are delivered to the application’s main Kinesis stream.

Using Kinesis Data Firehose

Kinesis Data Firehose can consume records from Kinesis Data Streams, or directly from the AWS SDK, AWS IoT Core, and other data producers. In Alleycat, there are multiple consumers that process incoming data, so Kinesis Data Firehose is configured as a consumer on the main stream.

The process of updating historical records is not a real-time process in Alleycat. Processing individual racer messages and comparing against all-time records is possible but would be computationally more complex. In practice, only a few incoming data points are all-time records. As a result, Alleycat asynchronously processes batches of records to find and update all-time records. The process is eventually consistent and the tradeoff is latency. There may be up to 1 minute between recording a personal record and updating the historical dataset in S3.

Kinesis Data Firehose provides several key functions for this process. First, it batches groups of messages based upon the batching hints provided in the AWS Serverless Application Model (AWS SAM) template. This application buffers records for up to 60 seconds or until 1 MB of records are available, whichever is reached first. You can adjust these settings and batch for up to 900 seconds or 128 MB of data. Note that these settings are hints and not absolute – the service can dynamically adjust these if data delivery falls behind data writing in the stream. As a result, hints should be treated as guidance and the actual settings may change.

Kinesis Data Firehose also enables data compression before delivery in various common formats. Since the S3 delivery bucket is an intermediary storage location in this application, using data compression reduces the storage cost. Kinesis can also encrypt data before delivery but this feature is not used in Alleycat. Finally, Kinesis Data Firehose can transform the incoming records by invoking a Lambda function. This enables the application to calculate the racer’s output before delivering the records.

The configuration for Kinesis Data Firehose, the S3 buckets, and the Lambda function is located in the template.yaml file in the GitHub repo. This AWS SAM template shows how to define the complete integration:

  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn:
      - DeliveryStreamPolicy    
    Properties:
      DeliveryStreamName: "alleycat-data-firehose"
      DeliveryStreamType: "KinesisStreamAsSource"
      KinesisStreamSourceConfiguration: 
        KinesisStreamARN: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStreamName}"
        RoleARN: !GetAtt DeliveryStreamRole.Arn
      ExtendedS3DestinationConfiguration: 
        BucketARN: !GetAtt DeliveryBucket.Arn
        BufferingHints: 
          SizeInMBs: 1
          IntervalInSeconds: 60
        CloudWatchLoggingOptions: 
          Enabled: true
          LogGroupName: "/aws/kinesisfirehose/alleycat-firehose"
          LogStreamName: "S3Delivery"
        CompressionFormat: "GZIP"
        EncryptionConfiguration: 
          NoEncryptionConfig: "NoEncryption"
        Prefix: ""
        RoleARN: !GetAtt DeliveryStreamRole.Arn
        ProcessingConfiguration: 
          Enabled: true
          Processors: 
            - Type: "Lambda"
              Parameters: 
                - ParameterName: "LambdaArn"
                  ParameterValue: !GetAtt FirehoseProcessFunction.Arn

Once Kinesis Data Firehose is set up, there is no ongoing administration. The service scales automatically to adjust to the amount of the data available in the application’s Kinesis data stream.

How the Lambda data transformer works

Kinesis Data Firehose buffers up to 3 MB before invoking the data transformation function (you can configure this setting with the ProcessingConfiguration API). The service delivers records as a JSON array:

{
    "invocationId": "d8ce3cc6-abcd-407a-abcd-d9bc2ce58e72",
    "sourceKinesisStreamArn": "arn:aws:kinesis:us-east-2:012345678912:stream/alleycat",
    "deliveryStreamArn": "arn:aws:firehose:us-east-2:012345678912:deliverystream/alleycat-firehose",
    "region": "us-east-2",
    "records": [
        {
            "recordId": "49617596128959546022271021234567...",
            "approximateArrivalTimestamp": 1619185789847,
            "data": "eyJ1dWlkIjoiYjM0MGQzMGYtjI1Yi00YWM4LThjY2QtM2ZhNThiMWZmZjNlIiwiZXZlbnQiOiJ1cGRhdGUiLCJkZXZpY2VUaW1lc3RhbXiOjE2MTkxODU3ODk3NUsInNlY29uZCI6MwicmFjZUlkIjoxNjE5MTg1NjIwMj1LCJuYW1lIjoiSHViZXJ0IiwicmFjZXJJZCI6MSwiY2xhc3NJZCI6MSwiY2FkZW5jZSI6ODAsInJlc2lzdGFuY2UiOjc4fQ==",
            "kinesisRecordMetadata": {
                "sequenceNumber": "49617596128959546022271020452",
                "subsequenceNumber": 0,
                "partitionKey": "1619185789798",
                "shardId": "shardId-000000000000",
                "approximateArrivalTimestamp": 1619185789847
            }
        }, 
   ...

The payload contains metadata about the invocation and data source and each record contains a recordId and a base64 encoded data attribute. The Lambda function can then decode and modify the data attribute for each record as needed. The Lambda function must finally return a JSON array of the same length as the incoming event.records array, with the following attributes:

  • recordId: this must match the incoming recordId, so Kinesis can map the modified data payload back to the original record.
  • result: this must be “Ok”, “Dropped”, or “ProcessingFailed”. “Dropped” means that the function has intentionally removed a payload from processing. Any records with “ProcessingFailed” are delivered to the S3 bucket in a folder called processing-failed. This includes metadata indicating the number of delivery attempts, the timestamp of the last attempt, and the Lambda function’s ARN.
  • data: the returned data payload must be base64 encoded and the modified record must be within the 1 MB limit per record.

The transformer function in Alleycat shows how to implement this process in Node.js:

exports.handler = (event) => {
  const output = event.records.map((record) => {
    // Extract JSON record from base64 data
    const buffer = Buffer.from(record.data, "base64").toString()
    const jsonRecord = JSON.parse(buffer)

    // Add calculated field
    jsonRecord.output = ((jsonRecord.cadence + 35) * (jsonRecord.resistance + 65)) / 100

    // Convert back to base64 + add a newline
    const dataBuffer = Buffer.from(JSON.stringify(jsonRecord) + "\n", "utf8").toString("base64")

    return {
      recordId: record.recordId,
      result: "Ok",
      data: dataBuffer,
    }
  })

  console.log(`{ recordsTotal: ${output.length} }`)
  return { records: output }
}

If you are processing the data in downstream services in JSON format, adding a newline at the end of the data buffer for each record can simplify the import process.

The data transformation Lambda function can run for up to 5 minutes and can use other AWS services for data processing. The Kinesis Data Firehose service scales up the Lambda function automatically if traffic increases, up to 5 outstanding invocations per shard (or 10 if the destination is Splunk).

Processing the Kinesis Data Firehose output

Kinesis Data Firehose puts a series of objects in the intermediary S3 bucket. Each put event invokes the S3 processor Lambda function. This function compares each data point to historical best performance, per racer ID, per class ID, at the same second of the race.

Data points that do not beat existing records are discarded. Otherwise, the function merges new historical records into the dataset and saves the result into the final S3 history bucket. This object is compressed in gzip format and contains the history for a single class ID.

When the frontend downloads this dataset from the S3 bucket, it decompresses the object. The resulting JSON structure shows personal output records per racer ID for each second of the race. The frontend uses this data to update the leaderboard on the local device during each second of the active race.

JSON output

Conclusion

In this post, I explain the all-time leaderboard logic in the Alleycat application. This is an asynchronous, eventually consistent process that checks batches of incoming records for new personal records. This uses Kinesis Data Firehose to provide a zero-administration way to deliver and process large batches of records continuously.

This post shows the architecture in Alleycat and how this is defined in AWS SAM. Finally, I walk through how to build a data transformation Lambda function that correctly decodes a payload and returns records back to Kinesis.

Part 4 show how to combine Kinesis with Amazon DynamoDB to support queries for streaming data. Alleycat uses this architecture to provide real-time rankings for competitors in the same virtual race.

For more serverless learning resources, visit Serverless Land.

Building serverless applications with streaming data: Part 2

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-serverless-applications-with-streaming-data-part-2/

Part 1 introduces the Alleycat application that allows bike racers to compete with each other virtually on home exercise bikes. I explain the application’s functionality, how to deploy to your AWS account, and provide an architectural review.

This series is about building serverless solutions in streaming data workloads. These are traditionally challenging to build, since data can be streamed from thousands or even millions of devices continuously.

In the example scenario, there are 40,000 users and up to 1,000 competitors may race at any given time. The workload must continuously ingest and buffer this data, then process and analyze the information to provide analytics and leaderboard content for the frontend application.

In this post, I focus on data ingestion. I compare the two different methods used in Alleycat, and discuss other approaches available. This post refers to Amazon Kinesis Data Streams, the AWS SDK, and AWS IoT Core in the solutions.

To set up the example, visit the GitHub repo and follow the instructions in the README.md file. Note that this walkthrough uses services that are not covered by the AWS Free Tier and incur cost.

Using AWS IoT Core to ingest streaming data

AWS IoT Core enables publish-subscribe capabilities for large numbers of client applications. Clients can send data to the backend using the AWS IoT Device SDK, which uses the MQTT standard for IoT messaging. After processing, the backend can publish aggregation and status messages back to the frontend via AWS IoT Core. This service fans out the messages to clients using topics.

When using this approach, note the Quality of Service (QoS) options available. By default, the SDK uses QoS level 0, which means the device does not confirm the message is received. This is intended for workloads that can lose messages occasionally without impacting performance. In Alleycat, if performance metrics are sometimes lost, this does not likely impact the overall end user experience.

For workloads requiring higher reliability, use QoS level 1, which causes the SDK to resend the message until an acknowledgement is received. While there is no additional charge for using QoS level 1, it generally increases the number of messages, which increases the overall cost. You are not charged for the PUBACK acknowledgement message – for more details, read more about AWS IoT Core pricing.

Frontend

In this scenario, the Alleycat frontend application is running on a physical exercise bike. The user selects a racer ID and exercise class and chooses Start Race to join the current virtual race for that class.

Start race UI

Every second, the frontend sends a message containing the cadence and resistance metrics and the current second in the race for the local racer. This message is created as a JSON object in the Home.vue component and sent to the ‘alleycat-publish’ topic:

      const message = {
        uuid: uuidv4(),
        event: this.event,
        deviceTimestamp: Date.now(),
        second: this.currentSecond,
        raceId: RACE_ID,
        name: this.racer.name,
        racerId: this.racer.id,
        classId: this.selectedClassId,
        cadence: this.racer.getCurrentCadence(),
        resistance: this.racer.getCurrentResistance
      }

The IoT.vue component contains the logic for this integration and uses the AWS IoT Device SDK to send and receive messages. On startup, the frontend connects to AWS IoT Core and publishes the messages using an MQTT client:

    bus.$on('publish', (data) => {
      console.log('Publish: ', data)
      mqttClient.publish(topics.publish, JSON.stringify(data))
    })

The SDK automatically attempts to retry in the event of a network disconnection and exposes an error handler to allow custom logic if other errors occur.

Backend

The resources used in the backend are defined using the AWS Serverless Application Model (AWS SAM) and configured in the core setup templates:

Reference architecture

Messages are published to topics in AWS IoT Core, which act as channels of interest. The message broker uses topic names and topic filters to route messages between publishers and subscribers. Incoming messages are routed using rules. Alleycat’s IoT rule routes all incoming messages to a Kinesis stream:

  IotTopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: 'alleycatIngest'
      TopicRulePayload:
        RuleDisabled: 'false'
        Sql: "SELECT * FROM 'alleycat-publish'"
        Actions:
        - Kinesis:
            StreamName: 'alleycat'
            PartitionKey: "${timestamp()}"
            RoleArn: !GetAtt IoTKinesisRole.Arn

  IoTKinesisRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - iot.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: IoTKinesisPutPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: 'kinesis:PutRecord'
                Resource: !GetAtt KinesisStream.Arn

Using the AWS::IoT::TopicRule resource, you can optionally define an error action. This allows you to store messages in a durable location, such as an Amazon S3 bucket, if an error occurs. Errors can occur if a rule does not have permission to access a destination or throttling occurs in a target.

Rules can route matching messages to up to 10 targets. For debugging purposes, you can also enable Amazon CloudWatch Logs, which can help in troubleshoot failed message deliveries. The AWS IoT Core Message Broker allows up to 20,000 publish requests per second – if you need a higher limit for your workload, submit a request to AWS Support.

Using the AWS SDK to ingest streaming data

The Alleycat frontend creates traffic for a single user but there is also a simulator application that can generate messages for up to 1,000 riders. Instead of routing messages using an MQTT client, the simulator uses the AWS SDK to put messages directly into the Kinesis data stream.

The SDK provides a service interface object for Kinesis and two API methods for putting messages into streams: putRecord and putRecords. The first option accepts only a single message but the second enables batching of up to 500 messages per request. This is the preferred option for adding multiple messages, compared with calling putRecord multiple times.

The putRecords API takes parameters as a JSON array of messages:

const params = {
   StreamName: 'alley-cat',
   [{
      "Data":"{\"event\":\"update\",\"deviceTimestamp\":1620824038331,\"second\":3,\"raceId\":5402746,\"name\":\"Hayden\",\"racerId\":0,\"classId\":1,\"cadence\":79.8,\"resistance\":79}",
      "PartitionKey":"1620824038331"
   },
   {
      "Data":"{\"event\":\"update\",\"deviceTimestamp\":1620824038331,\"second\":3,\"raceId\":5402746,\"name\":\"Hubert\",\"racerId\":1,\"classId\":1,\"cadence\":60.4,\"resistance\":60.6}",
      "PartitionKey":"1620824038331"
   }
]}

The SDK automatically base64 encodes the Data attribute, which in this case is the JSON string output from JSON.stringify. In the JavaScript SDK, the putRecords API can return a promise, allowing the code to await the operation:

const result = await kinesis.putRecords(params).promise()

Shards and partition keys

Kinesis data streams consist of one or more shards, which are sequences of data records with a fixed capacity. Each shard can support up to 1,000 records per second for writes, up to maximum total data write rate of 1MB per second. The total capacity of a stream is the total of its shards.

When you send messages to a stream, the partitionKey attribute determines which shard it is routed to. The example application configures a Kinesis data stream with a single shard so the partitionKey attribute has no effect – all messages are routed to the same shard. However, many production applications have more than one shard and use the partitionKey to assign messages to shards.

The partitionKey is hashed by the Kinesis service to route to a shard. This diagram shows how partitionKey values from data producers are hashed by an MD5 function and mapped to individual shards:

MD5 hash process

While you cannot designate a specific shard ID in a message, you can influence the assignment depending on your choice of partitionKey:

  • Random: Using a randomized value results in random hash so messages are randomly sent to different shards. This effectively load balances messages across all available shards.
  • Time-based: A timestamp value may cause groups of messages sent to a single shard, if the messages arrive at the same time. The identical timestamp results in an identical hash.
  • Application-specific: if Alleycat used the classID as a partitionKey, racers in each class would always be routed to the same shard. This could be useful for downstream aggregation logic but would limit the capacity of messages per classID.

Optimizing capacity in a shard

Each shard can ingest data at a rate of 1 MB per second or 1,000 records per second, whichever limit is reached first. Since the payload maximum is 1MB, this could equate to one 1MB message per second. If the payload is larger, you must divide it into smaller pieces to avoid an error. For 1,000 messages, each payload must be under 1 KB on average to fit within the allowed capacity.

The combination of the two payload limits can result in different capacity profiles for a shard:

Capacity profiles in a shard

  1. The data payloads are evenly sized and use the 1 MB per second capacity.
  2. Data payload sizes vary, so the number of messages that can be packed into 1 MB varies per second.
  3. There are a large number of very messages, consuming all 1,000 messages per second. However, the total data capacity used is significantly less than 1 MB.

In the Alleycat application, the average payload size is around 170 bytes. When producing 1,000 messages a second, the workload is only using about 20% of the 1 MB per second limit. Since PUT payload size is a factor in Kinesis pricing, messages that are much smaller than 25 KB are less cost-efficient. Compare these two messaging patterns for the Alleycat application:

Producer message patterns

  1. In this default mode, a smaller message is published once per second. This reduces overall latency but results in higher overall messaging cost.
  2. The client application batches outgoing messages and sends to Kinesis every 5 seconds. This results in lower cost and better packing of messages, but introduces additional latency.

There is a tradeoff between cost and latency when optimizing a shard’s capacity and the decision depends upon the needs of your workload. If the client buffers messages, this adds latency on the client side. This is acceptable in many workloads that collect metrics for archival or asynchronous reporting purchases. However, for low-latency applications like Alleycat, it provides a better experience for the application user to send messages as soon as they are available.

Conclusion

This post focuses on ingesting data into Kinesis Data Streams. I explain the two approaches used by the Alleycat frontend and the simulator application and highlight other approaches that you can use. I show how messages are routed to shards using partition keys. Finally, I explore additional factors to consider when ingesting data, to improve efficiency and reduce cost.

Part 3 covers using Amazon Kinesis Data Firehose for transforming, aggregating, and loading streaming data into data stores. This is used to provide the historical, second-by-second leaderboard for the frontend application.

For more serverless learning resources, visit Serverless Land.