Tag Archives: Amazon Kinesis

Introduction to Messaging for Modern Cloud Architecture

Post Syndicated from Sam Dengler original https://aws.amazon.com/blogs/architecture/introduction-to-messaging-for-modern-cloud-architecture/

We hope you’ve enjoyed reading our posts on best practices for your serverless applications. The posts in the following series will focus on best practices when introducing messaging patterns into your applications. Let’s review some core messaging concepts and see how they can be used to address challenges when designing modern cloud architectures.

Introduction

Applications can communicate information with each other using messages, a mechanism for packaging a data payload and associated metadata. The application that sends a message is called the producer and the application that receives the message is called the consumer. Producers and consumers exchange messages using a variety of transportation channels, for example point-to-point requests, message queues, subscription topics, or event buses. These transportation channels have differently characteristics that make them useful when implementing message communication patterns. Dependencies emerge when producers and consumers exchange messages, which is called coupling.

Synchronous Communication

synchronous communication

Message communication is called synchronous when the producer sends a message to the consumer and waits for a response before the producer continues its processing logic. An example of synchronous communication over a point-to-point channel is when a HTTP client makes a request to a HTTP service, waits for the service to process the request, and then applies logic to the HTTP response to determine how to proceed.

Synchronous communication patterns are more straightforward to implement, however they create tight coupling between producers and consumers. Tight coupling can cause problems due to traffic spikes and failures propagating directly throughout the application. For example, in a three-tier architecture, when the application experiences a spike in client traffic, the web tier directly translates the traffic spike as pressure on downstream resources (the logic and data tiers), which may not scale to meet the sudden demand. Likewise, downstream resource failure in the logic or data tier directly impacts the web tier from responding to client requests. Applications can mimic a synchronous experience, for example a status spinner, using asynchronous communication with a polling or push notification strategy.

Asynchronous Communication

Asynchronous communication

Message communication is called asynchronous when the producer sends a message to the consumer and proceeds without waiting for the response. An example of asynchronous communication over a message queue channel is when a client publishes a message to a queue, and after the queue acknowledges receipt of the message, the publisher proceeds without waiting for the consumer to process the message.

Asynchronous communication patterns are implemented using transportation channels such as queues, topics, and event buses to create loose coupling between producers and consumers. Loose coupling increases an architecture’s resiliency to failure and ability to handle traffic spikes because it creates an indirection between producer and consumer communication, enabling them to operate independently of each other. Using the three-tier architecture example, a message queue can be introduced between the web, logic, and data tiers to enable each to scale independently of each other. When the application experiences a spike in client traffic, the web tier translates the traffic spike as more messages to the queue for processing, however the logic tier may continue to process messages off the queue without being directly impacted.

Considerations and Next Steps

Although asynchronous communication patterns can benefit modern cloud architectures, there are tradeoffs to consider. Asynchronous messaging adds latency to end-to-end processing time due to the addition of middleware. Producers and consumers take a dependency on the middleware stack, which must also scale to meet demand and be resilient to failure. Care must be taken to appropriately configure producers, consumers, and middleware to handle errors so that messages are not lost, more monitoring is required to ensure proper operations, and multiple logs must be correlated to troubleshoot and diagnose problems.

Amazon MQ, Amazon KinesisAmazon Simple Queue Service (SQS), Amazon Simple Notification Service (SNS), and Amazon EventBridge are highly available, large scale, failure resistant managed services that can be used to implement asynchronous messaging patterns. You can explore these services at the AWS Messaging page and their integration into Serverless Architectures in the free new digital course, Architecting Serverless Solutions. You can also visit the AWS Event-Driven Architecture page to learn how to apply messaging patterns to build event-driven solutions. The upcoming posts in this series will explore these AWS services to help ensure message patterns are implemented using best practices when applied to modern cloud architecture.

Streaming ETL with Apache Flink and Amazon Kinesis Data Analytics

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics/

Most businesses generate data continuously in real time and at ever-increasing volumes. Data is generated as users play mobile games, load balancers log requests, customers shop on your website, and temperature changes on IoT sensors. You can capitalize on time-sensitive events, improve customer experiences, increase efficiency, and drive innovation by analyzing this data quickly. The speed at which you get these insights often depends on how quickly you can load data into data lakes, data stores, and other analytics tools. As data volume and velocity increases, it becomes more important to not only load the incoming data, but also to transform and analyze it in near-real time.

This post looks at how to use Apache Flink as a basis for sophisticated streaming extract-transform-load (ETL) pipelines. Apache Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Kinesis Data Analytics, which enables you to build and run sophisticated streaming applications quickly, easily, and with low operational overhead.

This post discusses the concepts that are required to implement powerful and flexible streaming ETL pipelines with Apache Flink and Kinesis Data Analytics. It also looks at code examples for different sources and sinks. For more information, see the GitHub repo. The repo also contains an AWS CloudFormation template so you can get started in minutes and explore the example streaming ETL pipeline.

Architecture for streaming ETL with Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. It supports a wide range of highly customizable connectors, including connectors for Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, and Amazon Simple Storage Service (Amazon S3). Moreover, Apache Flink provides a powerful API to transform, aggregate, and enrich events, and supports exactly-once semantics. Apache Flink is therefore a good foundation for the core of your streaming architecture.

To deploy and run the streaming ETL pipeline, the architecture relies on Kinesis Data Analytics. Kinesis Data Analytics enables you to run Flink applications in a fully managed environment. The service provisions and manages the required infrastructure, scales the Flink application in response to changing traffic patterns, and automatically recovers from infrastructure and application failures. You can combine the expressive Flink API for processing streaming data with the advantages of a managed service by using Kinesis Data Analytics to deploy and run Flink applications. It allows you to build robust streaming ETL pipelines and reduces the operational overhead of provisioning and operating infrastructure.

The architecture in this post takes advantage of several capabilities that you can achieve when you run Apache Flink with Kinesis Data Analytics. Specifically, the architecture supports the following:

  • Private network connectivity – Connect to resources in your Amazon Virtual Private Cloud (Amazon VPC), in your data center with a VPN connection, or in a remote region with a VPC peering connection
  • Multiple sources and sinks – Read and write data from Kinesis data streams, Apache Kafka clusters, and Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters
  • Data partitioning – Determine the partitioning of data that is ingested into Amazon S3 based on information extracted from the event payload
  • Multiple Elasticsearch indexes and custom document IDs – Fan out from a single input stream to different Elasticsearch indexes and explicitly control the document ID
  • Exactly-once semantics – Avoid duplicates when ingesting and delivering data between Apache Kafka, Amazon S3, and Amazon Elasticsearch Service (Amazon ES)

The following diagram illustrates this architecture.

The remainder of this post discusses how to implement streaming ETL architectures with Apache Flink and Kinesis Data Analytics. The architecture persists streaming data from one or multiple sources to different destinations and is extensible to your needs. This post does not cover additional filtering, enrichment, and aggregation transformations, although that is a natural extension for practical applications.

This post shows how to build, deploy, and operate the Flink application with Kinesis Data Analytics, without further focusing on these operational aspects. It is only relevant to know that you can create a Kinesis Data Analytics application by uploading the compiled Flink application jar file to Amazon S3 and specifying some additional configuration options with the service. You can then execute the Kinesis Data Analytics application in a fully managed environment. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics developer guide.

Exploring a streaming ETL pipeline in your AWS account

Before you consider the implementation details and operational aspects, you should get a first impression of the streaming ETL pipeline in action. To create the required resources, deploy the following AWS CloudFormation template:

The template creates a Kinesis data stream and an Amazon Elastic Compute Cloud (Amazon EC2) instance to replay a historic data set into the data stream. This post uses data based on the public dataset obtained from the New York City Taxi and Limousine Commission. Each event describes a taxi trip made in New York City and includes timestamps for the start and end of a trip, information on the boroughs the trip started and ended in, and various details on the fare of the trip. A Kinesis Data Analytics application then reads the events and persists them to Amazon S3 in Parquet format and partitioned by event time.

Connect to the instance by following the link next to ConnectToInstance in the output section of the CloudFromation template that you executed previously. You can then start replaying a set of taxi trips into the data stream with the following code:

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

You can obtain this command with the correct parameters from the output section of the AWS CloudFormation template. The output section also points you to the S3 bucket to which events are persisted and an Amazon CloudWatch dashboard that lets you monitor the pipeline.

For more information about enabling the remaining combinations of sources and sinks, for example, Apache Kafka and Elasticsearch, see the GitHub repo.

Building a streaming ETL pipeline with Apache Flink

Now that you have seen the pipeline in action, you can dive into the technical aspects of how to implement the functionality with Apache Flink and Kinesis Data Analytics.

Reading and writing to private resources

Kinesis Data Analytics applications can access resources on the public internet and resources in a private subnet that is part of your VPC. By default, a Kinesis Data Analytics application only enables access to resources that you can reach over the public internet. This works well for resources that provide a public endpoint, for example, Kinesis data streams or Amazon Elasticsearch Service.

If your resources are private to your VPC, either for technical or security-related reasons, you can configure VPC connectivity for your Kinesis Data Analytics application. For example, MSK clusters are private; you cannot access them from the public internet. You may run your own Apache Kafka cluster on premises that is not exposed to the public internet and is only accessible from your VPC through a VPN connection. The same is true for other resources that are private to your VPC, such as relational databases or AWS PrivateLink-powered endpoints.

To enable VPC connectivity, configure the Kinesis Data Analytics application to connect to private subnets in your VPC. Kinesis Data Analytics creates elastic network interfaces in one or more of the subnets provided in your VPC configuration for the application, depending on the parallelism of the application. For more information, see Configuring Kinesis Data Analytics for Java Applications to access Resources in an Amazon VPC.

The following screenshot shows an example configuration of a Kinesis Data Analytics application with VPC connectivity:

The application can then access resources that have network connectivity from the configured subnets. This includes resources that are not directly contained in these subnets, which you can reach over a VPN connection or through VPC peering. This configuration also supports endpoints that are available over the public internet if you have a NAT gateway configured for the respective subnets. For more information, see Internet and Service Access for a VPC-Connected Kinesis Data Analytics for Java application.

Configuring Kinesis and Kafka sources

Apache Flink supports various data sources, including Kinesis Data Streams and Apache Kafka. For more information, see Streaming Connectors on the Apache Flink website.

To connect to a Kinesis data stream, first configure the Region and a credentials provider. As a general best practice, choose AUTO as the credentials provider. The application will then use temporary credentials from the role of the respective Kinesis Data Analytics application to read events from the specified data stream. This avoids baking static credentials into the application. In this context, it is also reasonable to increase the time between two read operations from the data stream. When you increase the default of 200 milliseconds to 1 second, the latency increases slightly, but it facilitates multiple consumers reading from the same data stream. See the following code:

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

This config is passed to the FlinkKinesisConsumer with the stream name and a DeserializationSchema. This post uses the TripEventSchema for deserialization, which specifies how to deserialize a byte array that represents a Kinesis record into a TripEvent object. See the following code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

For more information, see TripEventSchema.java and TripEvent.java on GitHub. Apache Flink provides other more generic serializers that can deserialize data into strings or JSON objects.

Apache Flink is not limited to reading from Kinesis data streams. If you configure the Kinesis Data Analytics application’s VPC settings correctly, Apache Flink can also read events from Apache Kafka and MSK clusters. Specify a comma-separated list of broker and port pairs to use for the initial connection to your cluster. This config is passed to the FlinkKafkaConsumer with the topic name and a DeserializationSchema to create a source that reads from the respective topic of the Apache Kafka cluster. See the following code:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");

DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

The resulting DataStream contains TripEvent objects that have been deserialized from the data ingested into the data stream and Kafka topic, respectively. You can then use the data streams in combination with a sink to persist the events into their respective destination.

Persisting data in Amazon S3 with data partitioning

When you persist streaming data to Amazon S3, you may want to partition the data. You can substantially improve query performance of analytic tools by partitioning data because partitions that cannot contribute to a query can be pruned and therefore do not need to be read. For example, the right partitioning strategy can improve Amazon Athena query performance and cost by reducing the amount of data read for a query. You should choose to partition your data by the same attributes used in your application logic and query patterns. Furthermore, it is common when processing streaming data to include the time of an event, or event time, in your partitioning strategy. This contrasts with using the ingestion time or some other service-side timestamp that does not reflect the time an event occurred as accurately as event time.

For more information about taking data partitioned by ingestion time and repartitioning it by event time with Athena, see Analyze your Amazon CloudFront access logs at scale. However, you can directly partition the incoming data based on event time with Apache Flink by using the payload of events to determine the partitioning, which avoids an additional post-processing step. This capability is called data partitioning and is not limited to partition by time.

You can realize data partitioning with Apache Flink’s StreamingFileSink and BucketAssigner. For more information, see Streaming File Sink on the Apache Flink website.

When given a specific event, the BucketAssigner determines the corresponding partition prefix in the form of a string. See the following code:

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }

  ...
}

The sink takes an argument for the S3 bucket as a destination path and a function that converts the TripEvent Java objects into a string. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

You can further customize the size of the objects you write to Amazon S3 and the frequency of the object creation with a rolling policy. You can configure your policy to have more events aggregated into fewer objects at the cost of increased latency, or vice versa. This can help avoid many small objects on Amazon S3, which can be a desirable trade-off for increased latency. A high number of objects can negatively impact the query performance of consumers reading the data from Amazon S3. For more information, see DefaultRollingPolicy on the Apache Flink website.

The number of output files that arrive in your S3 bucket per your rolling policy also depends on the parallelism of the StreamingFileSink and how you distribute events between Flink application operators. In the previous example, the Flink internal DataStream is partitioned by pickup location ID with the keyBy operator. The location ID is also used in the BucketAssigner as part of the prefix for objects that are written to Amazon S3. Therefore, the same node aggregates and persists all events with the same prefix, which results in particularly large objects on Amazon S3.

Apache Flink uses multipart uploads under the hood when writing to Amazon S3 with the StreamingFileSink. In case of failures, Apache Flink may not be able to clean up incomplete multipart uploads. To avoid unnecessary storage fees, set up the automatic cleanup of incomplete multipart uploads by configuring appropriate lifecycle rules on the S3 bucket. For more information, see Important Considerations for S3 on the Apache Flink website and Example 8: Lifecycle Configuration to Abort Multipart Uploads.

Converting output to Apache Parquet

In addition to partitioning data before delivery to Amazon S3, you may want to compress it with a columnar storage format. Apache Parquet is a popular columnar format, which is well supported in the AWS ecosystem. It reduces the storage footprint and can substantially increase query performance and reduce cost.

The StreamingFileSink supports Apache Parquet and other bulk-encoded formats through a built-in BulkWriter factory. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forBulkFormat(
    new Path("s3://<bucket name>"),
    ParquetAvroWriters.forSpecificRecord(TripEvent.class)
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

For more information, see Bulk-encoded Formats on the Apache Flink website.

Persisting events works a bit differently when you use the Parquet conversion. When you enable Parquet conversion, you can only configure the StreamingFileSink with the OnCheckpointRollingPolicy, which commits completed part files to Amazon S3 only when a checkpoint is triggered. You need to enable Apache Flink checkpoints in your Kinesis Data Analytics application to persist data to Amazon S3. It only becomes visible for consumers when a checkpoint is triggered, so your delivery latency depends on how often your application is checkpointing.

Moreover, you previously just needed to generate a string representation of the data to write to Amazon S3. In contrast, the ParquetAvroWriters expects an Apache Avro schema for events. For more information, see the GitHub repo. You can use and extend the schema on the repo if you are want an example.

In general, it is highly desirable to convert data into Parquet if you want to work with and query the persisted data effectively. Although it requires some additional effort, the benefits of the conversion outweigh these additional complexities compared to storing raw data.

Fanning out to multiple Elasticsearch indexes and custom document IDs

Amazon ES is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch clusters. A popular use case is to stream application and network log data into Amazon S3. These logs are documents in Elasticsearch parlance, and you can create one for every event and store it in an Elasticsearch index.

The Elasticsearch sink that Apache Flink provides is flexible and extensible. You can specify an index based on the payload of each event. This is useful when the stream contains different event types and you want to store the respective documents in different Elasticsearch indexes. With this capability, you can use a single sink and application, respectively, to write into multiple indexes. With newer Elasticsearch versions, a single index cannot contain multiple types. See the following code:

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());

    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);

events.addSink(sink);

You can also explicitly set the document ID when you send documents to Elasticsearch. If an event with the same ID is ingested into Elasticsearch multiple times, it is overwritten rather than creating duplicates. This enables your writes to Elasticsearch to be idempotent. In this way, you can obtain exactly-once semantics of the entire architecture, even if your data sources only provide at-least-once semantics.

The AmazonElasticsearchSink used above is an extension of the Elasticsearch sink that is comes with Apache Flink. The sink adds support to sign requests with IAM credentials so you can use the strong IAM-based authentication and authorization that is available from the service. To this end, the sink picks up temporary credentials from the Kinesis Data Analytics environment in which the application is running. It uses the Signature Version 4 method to add authentication information to the request that is sent to the Elasticsearch endpoint.

Leveraging exactly-once semantics

You can obtain exactly-once semantics by combining an idempotent sink with at-least-once semantics, but that is not always feasible. For instance, if you want to replicate data from one Apache Kafka cluster to another or persist transactional CDC data from Apache Kafka to Amazon S3, you may not be able to tolerate duplicates in the destination, but both of these sinks are not idempotent.

Apache Flink natively supports exactly-once semantics. Kinesis Data Analytics implicitly enables exactly-once mode for checkpoints. To obtain end-to-end exactly-once semantics, you need to enable checkpoints for the Kinesis Data Analytics application and choose a connector that supports exactly-once semantics, such as the StreamingFileSink. For more information, see Fault Tolerance Guarantees of Data Sources and Sinks on the Apache Flink website.

There are some side effects to using exactly-once semantics. For example, end-to-end latency increases for several reasons. First, you can only commit the output when a checkpoint is triggered. This is the same as the latency increases that occurred when you turned on Parquet conversion. The default checkpoint interval is 1 minute, which you can decrease. However, obtaining sub-second delivery latencies are difficult with this approach.

Also, the details of end-to-end exactly-once semantics are subtle. Although the Flink application may read in an exactly-once fashion from a data stream, duplicates may already be part of the stream, so you can only obtain at-least-once semantics of the entire application. For Apache Kafka as a source and sink, different caveats apply. For more information, see Caveats on the Apache Flink website.

Be sure that you understand all the details of the entire application stack before you take a hard dependency on exactly-once semantics. In general, if your application can tolerate at-least-once semantics, it’s a good idea to use that semantic instead of relying on stronger semantics that you don’t need.

Using multiple sources and sinks

One Flink application can read data from multiple sources and persist data to multiple destinations. This is interesting for several reasons. First, you can persist the data or different subsets of the data to different destinations. For example, you can use the same application to replicate all events from your on-premises Apache Kafka cluster to an MSK cluster. At the same time, you can deliver specific, valuable events to an Elasticsearch cluster.

Second, you can use multiple sinks to increase the robustness of your application. For example, your application that applies filters and enriches streaming data can also archive the raw data stream. If something goes wrong with your more complex application logics, Amazon S3 still has the raw data, which you can use to backfill the sink.

However, there are some trade-offs. When you bundle many functionalities in a single application, you increase the blast radius of failures. If a single component of the application fails, the entire application fails and you need to recover it from the last checkpoint. This causes some downtime and increased delivery latency to all delivery destinations in the application. Also, a single large application is often harder to maintain and to change. You should strike a balance between adding functionality or creating additional Kinesis Data Analytics applications.

Operational aspects

When you run the architecture in production, you set out to execute a single Flink application continuously and indefinitely. It is crucial to implement monitoring and proper alarming to make sure that the pipeline is working as expected and the processing can keep up with the incoming data. Ideally, the pipeline should adapt to changing throughput conditions and cause notifications if it fails to deliver data from the sources to the destinations.

Some aspects require specific attention from an operational perspective. The following section provides some ideas and further references on how you can increase the robustness of your streaming ETL pipelines.

Monitoring and scaling sources

The data stream and the MSK cluster, respectively, are the entry point to the entire architecture. They decouple the data producers from the rest of the architecture. To avoid any impact to data producers, which you often cannot control directly, you need to scale the input stream of the architecture appropriately and make sure that it can ingest messages at any time.

Kinesis Data Streams uses a throughput provisioning model based on shards. Each shard provides a certain read and write capacity. From the number of provisioned shards, you can derive the maximum throughput of the stream in terms of ingested and emitted events and data volume per second. For more information, see Kinesis Data Streams Quotas.

Kinesis Data Streams exposes metrics through CloudWatch that report on these characteristics and indicate whether the stream is over- or under-provisioned. You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively, or you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. Similar metrics exist for data egress, which you should also monitor. For more information, see Monitoring the Amazon Kinesis Data Streams with Amazon CloudWatch.

The following graph shows some of these metrics for the data stream of the example architecture. On average the Kinesis data stream receives 2.8 million events and 1.1 GB of data every minute.

You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively whereas you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. You can even automate the scaling of your Kinesis Data Streams. For more information, see Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount.

Apache Kafka and Amazon MSK use a node-based provisioning model. Amazon MSK also exposes metrics through CloudWatch, including metrics that indicate how much data and how many events are ingested into the cluster. For more information, see Amazon MSK Metrics for Monitoring with CloudWatch.

In addition, you can also enable open monitoring with Prometheus for MSK clusters. It is a bit harder to know the total capacity of the cluster, and you often need benchmarking to know when you should scale. For more information about important metrics to monitor, see Monitoring Kafka on the Confluent website.

Monitoring and scaling the Kinesis Data Analytics application

The Flink application is the central core of the architecture. Kinesis Data Analytics executes it in a managed environment, and you want to make sure that it continuously reads data from the sources and persists data in the data sinks without falling behind or getting stuck.

When the application falls behind, it often is an indicator that it is not scaled appropriately. Two important metrics to track the progress of the application are millisBehindLastest (when the application is reading from a Kinesis data stream) and records-lag-max (when it is reading from Apache Kafka and Amazon MSK). These metrics not only indicate that data is read from the sources, but they also tell if data is read fast enough. If the values of these metrics are continuously growing, the application is continuously falling behind, which may indicate that you need to scale up the Kinesis Data Analytics application. For more information, see Kinesis Data Streams Connector Metrics and Application Metrics.

The following graph shows the metrics for the example application in this post. During checkpointing, the maximum millisBehindLatest metric occasionally spikes up to 7 seconds. However, because the reported average of the metric is less than 1 second and the application immediately catches up to the tip of the stream again, it is not a concern for this architecture.

Although the lag of the application is one of the most important metrics to monitor, there are other relevant metrics that Apache Flink and Kinesis Data Analytics expose. For more information, see Monitoring Apache Flink Applications 101 on the Apache Flink website.

Monitoring sinks

To verify that sinks are receiving data and, depending on the sink type, do not run out of storage, you need to monitor sinks closely.

You can enable detailed metrics for your S3 buckets that track the number of requests and data uploaded into the bucket with 1-minute granularity. For more information, see Monitoring Metrics with Amazon CloudWatch. The following graph shows these metrics for the S3 bucket of the example architecture:

When the architecture persists data into a Kinesis data stream or a Kafka topic, it acts as a producer, so the same recommendations as for monitoring and scaling sources apply. For more information about operating and monitoring the service in production environments, see Amazon Elasticsearch Service Best Practices.

Handling errors

“Failures are a given and everything eventually fails over time”, so you should expect the application to fail at some point. For example, an underlying node of the infrastructure that Kinesis Data Analytics manages might fail, or intermittent timeouts on the network can prevent the application from reading from sources or writing to sinks. When this happens, Kinesis Data Analytics restarts the application and resumes processing by recovering from the latest checkpoint. Because the raw events have been persisted in a data stream or Kafka topic, the application can reread the events that have been persisted in the stream between the last checkpoint and when it recovered and continue standard processing.

These kinds of failures are rare and the application can gracefully recover without sacrificing processing semantics, including exactly-once semantics. However, other failure modes need additional attention and mitigation.

When an exception is thrown anywhere in the application code, for example, in the component that contains the logic for parsing events, the entire application crashes. As before, the application eventually recovers, but if the exception is from a bug in your code that a specific event always hits, it results in an infinite loop. After recovering from the failure, the application rereads the event, because it was not processed successfully before, and crashes again. The process starts again and repeats indefinitely, which effectively blocks the application from making any progress.

Therefore, you want to catch and handle exceptions in the application code to avoid crashing the application. If there is a persistent problem that you cannot resolve programmatically, you can use side outputs to redirect the problematic raw events to a secondary data stream, which you can persist to a dead letter queue or an S3 bucket for later inspection. For more information, see Side Outputs on the Apache Flink website.

When the application is stuck and cannot make any progress, it is at least visible in the metrics for application lag. If your streaming ETL pipeline filters or enriches events, failures may be much more subtle, and you may only notice them long after they have been ingested. For instance, due to a bug in the application, you may accidentally drop important events or corrupt their payload in unintended ways. Kinesis data streams stores events for up to 7 days and, though technically possible, Apache Kafka is often not configured to store events indefinitely either. If you don’t identify the corruption quickly enough, you risk losing information when the retention of the raw events expires.

To protect against this scenario, you can persist the raw events to Amazon S3 before you apply any additional transformations or processing to them. You can keep the raw events and reprocess or replay them into the stream if you need to. To integrate the functionality into the application, add a second sink that just writes to Amazon S3. Alternatively, use a separate application that only reads and persists the raw events from the stream, at the cost of running and paying for an additional application.

When to choose what

AWS provides many services that work with streaming data and can perform streaming ETL. Amazon Kinesis Data Firehose can ingest, process, and persist streaming data into a range of supported destinations. There is a significant overlap of the functionality between Kinesis Data Firehose and the solution in this post, but there are different reasons to use one or the other.

As a rule of thumb, use Kinesis Data Firehose whenever it fits your requirements. The service is built with simplicity and ease of use in mind. To use Kinesis Data Firehose, you just need to configure the service. You can use Kinesis Data Firehose for streaming ETL use cases with no code, no servers, and no ongoing administration. Moreover, Kinesis Data Firehose comes with many built-in capabilities, and its pricing model allows you to only pay for the data processed and delivered. If you don’t ingest data into Kinesis Data Firehose, you pay nothing for the service.

In contrast, the solution in this post requires you to create, build, and deploy a Flink application. Moreover, you need to think about monitoring and how to obtain a robust architecture that is not only tolerant against infrastructure failures but also resilient against application failures and bugs. However, this added complexity unlocks many advanced capabilities, which your use case may require. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

What’s next?

This post discussed how to build a streaming ETL pipeline with Apache Flink and Kinesis Data Analytics. It focused on how to build an extensible solution that addresses some advanced use cases for streaming ingest while maintaining low operational overhead. The solution allows you to quickly enrich, transform, and load your streaming data into your data lake, data store, or another analytical tool without the need for an additional ETL step. The post also explored ways to extend the application with monitoring and error handling.

You should now have a good understanding of how to build streaming ETL pipelines on AWS. You can start capitalizing on your time-sensitive events by using a streaming ETL pipeline that makes valuable information quickly accessible to consumers. You can tailor the format and shape of this information to your use case without adding the substantial latency of traditional batch-based ETL processes.

 


About the Author

Steffen Hausmann is a Specialist Solutions Architect for Analytics at AWS. He works with customers around the globe to design and build streaming architectures so that they can get value from analyzing their streaming data. He holds a doctorate degree in computer science from the University of Munich and in his free time, he tries to lure his daughters into tech with cute stickers he collects at conferences. You can follow his ruthless attempts on Twitter (@sthmmm).

 

 

Application analytics pipeline with Amazon EventBridge

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/application-analytics-pipeline-with-amazon-eventbridge/

This post is courtesy of Rajdeep Tarat, Solutions Architect and Venugopal Pai, Solutions Architect

Customers across industry verticals collect, analyze, and derive insights from end-user application analytics using solutions such as Google Analytics and MixPanel. While these solutions provide built-in dashboards for marketing analytics, it can be difficult to reuse the raw event data.

Setting up a pipeline to move the raw event data into AWS opens up possibilities for various rule-based, statistical, and machine learning algorithms to derive deep insights about end-user behavior. Additionally, the raw event data can be enriched with other transactional data points available within the customer’s AWS environment.

This post uses the Segment Partner integration in Amazon EventBridge to pipe the data into your AWS environment. Segment allows you to collect, unify, and connect end-user application analytics into AWS using Amazon EventBridge as a destination.

Segment already supports direct, optimized connections to many AWS services such as Amazon Redshift, Amazon Personalize, Amazon Kinesis, Amazon Kinesis Data Firehose, AWS Lambda, and Amazon S3. The EventBridge destination is a good choice for customers who want the flexibility and centralization that EventBridge offers.

EventBridge makes it easy to build scalable event-driven application pipelines by handling event ingestion, delivery, security, authorization, and error handling for you. The architecture of this pipeline is shown below:Segment architecture

In the diagram, end-user applications send the data into Segment, which is routed to each of the configured destinations (for example, EventBridge). Once the data reaches EventBridge, it is again routed to multiple targets. With this approach you can continue using existing solutions supported by Segment in parallel to the Amazon EventBridge destination.

This architecture makes the pipeline highly extensible and modular. Firstly, you can configure multiple Segment destinations to fan out the event data into other existing solutions in parallel to EventBridge. Marketing teams can continue to use their existing tools without any disruptions while the data is seamlessly pumped into AWS. Within the AWS Cloud, EventBridge can again route the event data to up to five targets per rule.

The following section provides a walkthrough of setting up the Segment integration with EventBridge, and configuring two targets within the AWS Cloud.

  • The first target uses an Amazon Kinesis Data Firehose to deliver the data to an S3 bucket. From the S3 bucket, multiple AWS services can use the data (learn more about using S3 as a data lake).
  • The second target posts the event data to an SNS topic. From here, the data can be consumed by subscribers for the topic.

Walkthrough

To set up the pipeline, you must configure the Segment partner integration in EventBridge, and then set up the targets where analytics data is sent.

Amazon EventBridge – Segment partner integration:

  1. From the Amazon EventBridge console, navigate to the Partner Event Sources > Segment setup page. Copy your AWS Account ID from here.
    Segment setup
  2. On the Segment destination setup page, use the Amazon EventBridge integration. Enter the AWS Account ID and select a Region (learn more about setting up a Segment destination).
    EventBridge settings

Create the event bus:

  1. After linking the Segment Destination with the AWS Account ID, send a test event from Segment to create a Partner Event Source. This also creates an Event bus with the same name. This is done by firing a test event from the Event Tester in the Segment Dashboard.
    Event Tester
  2. After the first test event is fired, the Partner Event Source and the corresponding event bus is created in the EventBridge console.
    Partner event sources

Create rules:

  1. A rule watches for incoming events and routes them to specific targets that are configured. Start by creating a new rule and entering a name.
    Creating rules
  2. For Event Pattern, select the Predefined pattern by Service, and select Service Partners > Segment.
    Define event pattern
  3. Under Select event bus, select the Custom or partner event bus, and the name of the event bus created.
    Select event bus

Configuring multiple targets for the event bus:

  1. For Kinesis streams, select Kinesis stream from the Target dropdown, and the name of the stream. For more details on creating a Kinesis data stream, read this documentation.
    Select targets
  2. For SNS topic, choose Add Target and repeat the same steps to add an SNS topic instead. For more details on creating an SNS topic, read this documentation.
    SNS as target
  3. You can optionally tag the resource, then choose Create.

The pipeline is ready to send data to the targets configured via the event bus. You can now send test events from the Segment dashboard and monitor Kinesis Data Firehose or by setting up subscribers for the SNS topic.

Conclusion

This post shows how customers can capture end-user application analytics using the partner solution Segment in real time, and ingest data into Amazon EventBridge. The data routing is made extensible using multiple Segment destinations (for third-party solutions), and using multiple rules in EventBridge (for multiple destinations within the AWS Cloud).

To learn more about Amazon EventBridge integrations, read the EventBridge documentation.

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

ICYMI: Serverless Q4 2019

Post Syndicated from Rob Sutter original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2019/

Welcome to the eighth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

The three months comprising the fourth quarter of 2019

AWS re:Invent

AWS re:Invent 2019

re:Invent 2019 dominated the fourth quarter at AWS. The serverless team presented a number of talks, workshops, and builder sessions to help customers increase their skills and deliver value more rapidly to their own customers.

Serverless talks from re:Invent 2019

Chris Munns presenting 'Building microservices with AWS Lambda' at re:Invent 2019

We presented dozens of sessions showing how customers can improve their architecture and agility with serverless. Here are some of the most popular.

Videos

Decks

You can also find decks for many of the serverless presentations and other re:Invent presentations on our AWS Events Content.

AWS Lambda

For developers needing greater control over performance of their serverless applications at any scale, AWS Lambda announced Provisioned Concurrency at re:Invent. This feature enables Lambda functions to execute with consistent start-up latency making them ideal for building latency sensitive applications.

As shown in the below graph, provisioned concurrency reduces tail latency, directly impacting response times and providing a more responsive end user experience.

Graph showing performance enhancements with AWS Lambda Provisioned Concurrency

Lambda rolled out enhanced VPC networking to 14 additional Regions around the world. This change brings dramatic improvements to startup performance for Lambda functions running in VPCs due to more efficient usage of elastic network interfaces.

Illustration of AWS Lambda VPC to VPC NAT

New VPC to VPC NAT for Lambda functions

Lambda now supports three additional runtimes: Node.js 12, Java 11, and Python 3.8. Each of these new runtimes has new version-specific features and benefits, which are covered in the linked release posts. Like the Node.js 10 runtime, these new runtimes are all based on an Amazon Linux 2 execution environment.

Lambda released a number of controls for both stream and async-based invocations:

  • You can now configure error handling for Lambda functions consuming events from Amazon Kinesis Data Streams or Amazon DynamoDB Streams. It’s now possible to limit the retry count, limit the age of records being retried, configure a failure destination, or split a batch to isolate a problem record. These capabilities help you deal with potential “poison pill” records that would previously cause streams to pause in processing.
  • For asynchronous Lambda invocations, you can now set the maximum event age and retry attempts on the event. If either configured condition is met, the event can be routed to a dead letter queue (DLQ), Lambda destination, or it can be discarded.

AWS Lambda Destinations is a new feature that allows developers to designate an asynchronous target for Lambda function invocation results. You can set separate destinations for success and failure. This unlocks new patterns for distributed event-based applications and can replace custom code previously used to manage routing results.

Illustration depicting AWS Lambda Destinations with success and failure configurations

Lambda Destinations

Lambda also now supports setting a Parallelization Factor, which allows you to set multiple Lambda invocations per shard for Kinesis Data Streams and DynamoDB Streams. This enables faster processing without the need to increase your shard count, while still guaranteeing the order of records processed.

Illustration of multiple AWS Lambda invocations per Kinesis Data Streams shard

Lambda Parallelization Factor diagram

Lambda introduced Amazon SQS FIFO queues as an event source. “First in, first out” (FIFO) queues guarantee the order of record processing, unlike standard queues. FIFO queues support messaging batching via a MessageGroupID attribute that supports parallel Lambda consumers of a single FIFO queue, enabling high throughput of record processing by Lambda.

Lambda now supports Environment Variables in the AWS China (Beijing) Region and the AWS China (Ningxia) Region.

You can now view percentile statistics for the duration metric of your Lambda functions. Percentile statistics show the relative standing of a value in a dataset, and are useful when applied to metrics that exhibit large variances. They can help you understand the distribution of a metric, discover outliers, and find hard-to-spot situations that affect customer experience for a subset of your users.

Amazon API Gateway

Screen capture of creating an Amazon API Gateway HTTP API in the AWS Management Console

Amazon API Gateway announced the preview of HTTP APIs. In addition to significant performance improvements, most customers see an average cost savings of 70% when compared with API Gateway REST APIs. With HTTP APIs, you can create an API in four simple steps. Once the API is created, additional configuration for CORS and JWT authorizers can be added.

AWS SAM CLI

Screen capture of the new 'sam deploy' process in a terminal window

The AWS SAM CLI team simplified the bucket management and deployment process in the SAM CLI. You no longer need to manage a bucket for deployment artifacts – SAM CLI handles this for you. The deployment process has also been streamlined from multiple flagged commands to a single command, sam deploy.

AWS Step Functions

One powerful feature of AWS Step Functions is its ability to integrate directly with AWS services without you needing to write complicated application code. In Q4, Step Functions expanded its integration with Amazon SageMaker to simplify machine learning workflows. Step Functions also added a new integration with Amazon EMR, making EMR big data processing workflows faster to build and easier to monitor.

Screen capture of an AWS Step Functions step with Amazon EMR

Step Functions step with EMR

Step Functions now provides the ability to track state transition usage by integrating with AWS Budgets, allowing you to monitor trends and react to usage on your AWS account.

You can now view CloudWatch Metrics for Step Functions at a one-minute frequency. This makes it easier to set up detailed monitoring for your workflows. You can use one-minute metrics to set up CloudWatch Alarms based on your Step Functions API usage, Lambda functions, service integrations, and execution details.

Step Functions now supports higher throughput workflows, making it easier to coordinate applications with high event rates. This increases the limits to 1,500 state transitions per second and a default start rate of 300 state machine executions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). Click the above link to learn more about the limit increases in other Regions.

Screen capture of choosing Express Workflows in the AWS Management Console

Step Functions released AWS Step Functions Express Workflows. With the ability to support event rates greater than 100,000 per second, this feature is designed for high-performance workloads at a reduced cost.

Amazon EventBridge

Illustration of the Amazon EventBridge schema registry and discovery service

Amazon EventBridge announced the preview of the Amazon EventBridge schema registry and discovery service. This service allows developers to automate discovery and cataloging event schemas for use in their applications. Additionally, once a schema is stored in the registry, you can generate and download a code binding that represents the schema as an object in your code.

Amazon SNS

Amazon SNS now supports the use of dead letter queues (DLQ) to help capture unhandled events. By enabling a DLQ, you can catch events that are not processed and re-submit them or analyze to locate processing issues.

Amazon CloudWatch

Amazon CloudWatch announced Amazon CloudWatch ServiceLens to provide a “single pane of glass” to observe health, performance, and availability of your application.

Screenshot of Amazon CloudWatch ServiceLens in the AWS Management Console

CloudWatch ServiceLens

CloudWatch also announced a preview of a capability called Synthetics. CloudWatch Synthetics allows you to test your application endpoints and URLs using configurable scripts that mimic what a real customer would do. This enables the outside-in view of your customers’ experiences, and your service’s availability from their point of view.

CloudWatch introduced Embedded Metric Format, which helps you ingest complex high-cardinality application data as logs and easily generate actionable metrics. You can publish these metrics from your Lambda function by using the PutLogEvents API or using an open source library for Node.js or Python applications.

Finally, CloudWatch announced a preview of Contributor Insights, a capability to identify who or what is impacting your system or application performance by identifying outliers or patterns in log data.

AWS X-Ray

AWS X-Ray announced trace maps, which enable you to map the end-to-end path of a single request. Identifiers show issues and how they affect other services in the request’s path. These can help you to identify and isolate service points that are causing degradation or failures.

X-Ray also announced support for Amazon CloudWatch Synthetics, currently in preview. CloudWatch Synthetics on X-Ray support tracing canary scripts throughout the application, providing metrics on performance or application issues.

Screen capture of AWS X-Ray Service map in the AWS Management Console

X-Ray Service map with CloudWatch Synthetics

Amazon DynamoDB

Amazon DynamoDB announced support for customer-managed customer master keys (CMKs) to encrypt data in DynamoDB. This allows customers to bring your own key (BYOK) giving you full control over how you encrypt and manage the security of your DynamoDB data.

It is now possible to add global replicas to existing DynamoDB tables to provide enhanced availability across the globe.

Another new DynamoDB capability to identify frequently accessed keys and database traffic trends is currently in preview. With this, you can now more easily identify “hot keys” and understand usage of your DynamoDB tables.

Screen capture of Amazon CloudWatch Contributor Insights for DynamoDB in the AWS Management Console

CloudWatch Contributor Insights for DynamoDB

DynamoDB also released adaptive capacity. Adaptive capacity helps you handle imbalanced workloads by automatically isolating frequently accessed items and shifting data across partitions to rebalance them. This helps reduce cost by enabling you to provision throughput for a more balanced workload instead of over provisioning for uneven data access patterns.

Amazon RDS

Amazon Relational Database Services (RDS) announced a preview of Amazon RDS Proxy to help developers manage RDS connection strings for serverless applications.

Illustration of Amazon RDS Proxy

The RDS Proxy maintains a pool of established connections to your RDS database instances. This pool enables you to support a large number of application connections so your application can scale without compromising performance. It also increases security by enabling IAM authentication for database access and enabling you to centrally manage database credentials using AWS Secrets Manager.

AWS Serverless Application Repository

The AWS Serverless Application Repository (SAR) now offers Verified Author badges. These badges enable consumers to quickly and reliably know who you are. The badge appears next to your name in the SAR and links to your GitHub profile.

Screen capture of SAR Verifiedl developer badge in the AWS Management Console

SAR Verified developer badges

AWS Developer Tools

AWS CodeCommit launched the ability for you to enforce rule workflows for pull requests, making it easier to ensure that code has pass through specific rule requirements. You can now create an approval rule specifically for a pull request, or create approval rule templates to be applied to all future pull requests in a repository.

AWS CodeBuild added beta support for test reporting. With test reporting, you can now view the detailed results, trends, and history for tests executed on CodeBuild for any framework that supports the JUnit XML or Cucumber JSON test format.

Screen capture of AWS CodeBuild

CodeBuild test trends in the AWS Management Console

Amazon CodeGuru

AWS announced a preview of Amazon CodeGuru at re:Invent 2019. CodeGuru is a machine learning based service that makes code reviews more effective and aids developers in writing code that is more secure, performant, and consistent.

AWS Amplify and AWS AppSync

AWS Amplify added iOS and Android as supported platforms. Now developers can build iOS and Android applications using the Amplify Framework with the same category-based programming model that they use for JavaScript apps.

Screen capture of 'amplify init' for an iOS application in a terminal window

The Amplify team has also improved offline data access and synchronization by announcing Amplify DataStore. Developers can now create applications that allow users to continue to access and modify data, without an internet connection. Upon connection, the data synchronizes transparently with the cloud.

For a summary of Amplify and AppSync announcements before re:Invent, read: “A round up of the recent pre-re:Invent 2019 AWS Amplify Launches”.

Illustration of AWS AppSync integrations with other AWS services

Q4 serverless content

Blog posts

October

November

December

Tech talks

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page.

Here are the ones from Q4:

Twitch

October

There are also a number of other helpful video series covering Serverless available on the AWS Twitch Channel.

AWS Serverless Heroes

We are excited to welcome some new AWS Serverless Heroes to help grow the serverless community. We look forward to some amazing content to help you with your serverless journey.

AWS Serverless Application Repository (SAR) Apps

In this edition of ICYMI, we are introducing a section devoted to SAR apps written by the AWS Serverless Developer Advocacy team. You can run these applications and review their source code to learn more about serverless and to see examples of suggested practices.

Still looking for more?

The Serverless landing page has much more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. We’re also kicking off a fresh series of Tech Talks in 2020 with new content providing greater detail on everything new coming out of AWS for serverless application developers.

Throughout 2020, the AWS Serverless Developer Advocates are crossing the globe to tell you more about serverless, and to hear more about what you need. Follow this blog to keep up on new launches and announcements, best practices, and examples of serverless applications in action.

You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

Chris Munns: @chrismunns
Eric Johnson: @edjgeek
James Beswick: @jbesw
Moheeb Zara: @virgilvox
Ben Smith: @benjamin_l_s
Rob Sutter: @rts_rob
Julian Wood: @julian_wood

Happy coding!

Under the hood: Scaling your Kinesis data streams

Post Syndicated from Ahmed Gaafar original https://aws.amazon.com/blogs/big-data/under-the-hood-scaling-your-kinesis-data-streams/

Real-time delivery of data and insights enables businesses to pivot quickly in response to changes in demand, user engagement, and infrastructure events, among many others. Amazon Kinesis offers a managed service that lets you focus on building your applications, rather than managing infrastructure. Scalability is provided out-of-the-box, allowing you to ingest and process gigabytes of streaming data per second. Data replication to three Availability Zones offers high availability and durability. Pricing is based on usage and requires no upfront costs, making Kinesis a cost-effective solution.

Amazon Kinesis Data Streams use a provisioned capacity model. Each data stream is composed of one or more shards that act as units of capacity. Shards make it easy for you to design and scale a streaming pipeline by providing a predefined write and read capacity. As workloads grow, an application may read or write to a shard at a rate that exceeds its capacity, creating a hot shard and requiring you to add capacity quickly. Shards also enable you to parallelize the processing of large datasets and compute results quickly.

This post discusses how to scale your data streams and avoid hot shards. The post first shows you how to estimate the number of shards you need in your data stream as you design your streaming pipeline. Then it looks at reasons that lead to hot shards and how to avoid those using Kinesis Data Streams scaling mechanisms and reviews important metrics for monitoring.

Estimating your stream capacity

The following diagram shows a streaming data pipeline connected to a multiplayer video game. Kinesis Data Streams ingest player scores and other stats. You can filter and enrich the data, and write it to DynamoDB tables that populate the game’s various leaderboards.

As you embark on designing your streaming pipeline, it’s important to set up the data stream with enough capacity to handle producers ingesting the data records producers create, and handle users consuming the same records. You can ingest up to 1 MB per second per shard or 1,000 data records per second per shard for writes. Read capacity is up to 2 MB per second per shard or five read transactions per second. All applications reading from the stream share the read capacity. You can use the enhanced fan-out feature to scale the number of consuming applications and make sure that each has a dedicated 2 MB per second connection.

This post uses the preceding application as an example. It’s estimated that producers create data records at a rate of 20,000 KB per second, and your consumer nodes need to process this same amount of data at the other end of the stream. In addition to handling these rates, it is a good idea to add extra capacity to give the stream headroom for growth.

This headroom also helps your application recover faster in scenarios that could cause a delay or a pause in ingesting or processing data. These scenarios may include:

  • Deploying a new version of a consumer application
  • Transient network issues

As these nodes work to catch up after recovery, they produce or consume records at a higher than standard rate, requiring higher capacity. For this example, you can add 25% percent, or five shards, for headroom. Shards are cost-efficient, but it is up to you how many you want to add.

Scaling scenario

At the time of the game’s release, this capacity is deemed sufficient for the application. Ingestion and processing of the data are both running smoothly, and the game’s leaderboards are populated with current data. It’s now a few weeks after release; the game is steadily gaining popularity and concurrent player numbers are increasing. In a scenario such as this, it’s important to have sufficient monitoring to detect the increased load so you can increase throughput by scaling the stream.

The following diagram provides a simplified view of using CloudWatch metrics to monitor your data streams and triggering scaling operations

In this example, these scaling issues could manifest in delayed leaderboard update reports. Because shards are the capacity units in a data stream, each shard’s capacity is independent of other shards. If the producers write to a single shard at a rate higher than 1 MB per second or 1,000 records per second, that shard becomes a hot shard and requests exceeding that capacity get throttled, leading to a delay in leaderboard updates. This condition can happen while other shards in the stream are underutilized, so if you are monitoring metrics at the stream level, you may not see any cause for concern, because the stream overall is ingesting data at a rate below its total capacity of 25 MB per second. Amazon Kinesis enables you to seamlessly scale your stream without interrupting your streaming pipeline.

Core concepts that enable scaling

You can write records to a data stream using Put APIs. To write a single record, use PutRecord; to write multiple records, use PutRecords. When executing either one, the request to the Kinesis API has to include the following three components:

  • The stream name.
  • The data record to write to the stream. For this post, this is the scoring result of a particular round in the game.
  • A partition key (for example, the fame session).

The following diagram shows multiple producers writing to a Kinesis data stream. The partition key value is used in combination with a hash function to determine the shard a given record will be written to.

The partition key determines to which shard the record is written. The partition key is a Unicode string with a maximum length of 256 bytes. Kinesis runs the partition key value that you provide in the request through an MD5 hash function. The resulting value maps your record to a specific shard within the stream, and Kinesis writes the record to that shard. Partition keys dictate how to distribute data across the stream and use shards.

Certain use cases require you to partition data based on specific criteria for efficient processing by the consuming applications. As an example, if you use player ID pk1234 as the hash key, all scores related to that player route to shard1. The consuming application can use the fact that data stored in shard1 has an affinity with the player ID and can efficiently calculate the leaderboard. An increase in traffic related to players mapped to shard1 can lead to a hot shard. Kinesis Data Streams allows you to handle such scenarios by splitting or merging shards without disrupting your streaming pipeline.

If your use cases do not require data stored in a shard to have high affinity, you can achieve high overall throughput by using a random partition key to distribute data. Random partition keys help distribute the incoming data records evenly across all the shards in the stream and reduce the likelihood of one or more shards getting hit with a disproportionate number of records. You can use a universally unique identifier (UUID) as a partition key to achieve this uniform distribution of records across shards. This strategy can increase the latency of record processing if the consumer application has to aggregate data from multiple shards.

Kinesis Data Streams scaling mechanisms

The stream remains fully functional during these actions. Producers and consumers can continue to read and write to the stream during the scaling process.

Upon receiving the scaling request, Kinesis sets the stream status to Updating. You can use the DescribeStreams API to check the stream status. When the operation is complete, the stream status shows as Active.

The SplitShard  action splits one active shard into two shards, increasing the read and write capacity of the stream. This can be helpful if there is an expected increase in the number of records ingested into the stream or so more Kinesis Data Streams applications can simultaneously read data from the stream for real-time processing.

SplitShard facilitates that process. The hash key space of the parent shard also splits. The two new shards accept new data records and the parent shard stops accepting new records. Existing records in the parent shard are retained for the duration of the stream retention period (the default is 24 hours, configurable up to 7 days). You must specify the new-starting-hash-key value when issuing this command. This value determines the point of the split within the parent shard’s hash key space. In most cases, you want to do an even split. However, you might need to do an uneven split if you have unbalanced shards that you want to rebalance, for example. The following diagram shows the SplitShard process in action.

Many streaming workloads have variable data flow rates that fluctuate over time, sometimes following daily, weekly, or seasonal patterns. As you monitor your data flow rates, you may see underutilized shards that, if merged, still have a data flow rate below the shard limits, yet reduce the cost of the stream.

The MergeShards action merges two adjacent shards, producing one shard. Two shards are considered adjacent if the hash key spaces for the two form a contiguous set with no gaps. When the two shards merge, their hash key spaces also merge. The new shard starts accepting new data records. The two parent shards stop accepting new records and retain existing records up to the stream’s configured retention period. The following diagram shows the MergeShards process in action.

The UpdateShardCount action is useful when you need to scale your stream, up or down, to a specific number of shards, and you provide that number as a parameter in the API call. Scaling in increments of 25% of the current capacity (25%, 50%, 75%, 100%) helps the operation complete faster, but is not required. The command executes a series of SplitShard and MergeShards actions as needed to reach the explicit number of shards you specified. This command splits shard hash key space evenly, creating shards of equal size. There is no option to choose a different value.

Additionally, the Kinesis Scaling Utility available on GitHub provides autoscaling for Kinesis Data Streams by monitoring a stream’s Amazon CloudWatch metrics and scaling it up or down accordingly. It can scale a stream by an explicit shard count or as a percentage of the total fleet. There is no requirement for you to manage the allocation of the key space to shards when using this API; it happens automatically.

Balancing shards

After completing a scaling operation, check the distribution of the hash key space in your stream. In most use cases, the hash key space should be evenly distributed across the shards in the stream. Errors in calculating or inputting a shard’s hash key space starting value can lead to creating new shards with an unusually large or small hash key space. Unusually large shards receive a high number of read and write requests, leading to throttling and underutilizing the unusually small shards.

The output of the ListShards API lists the starting and ending hash key value for each shard in the stream. You can use these values to identify unbalanced shards, and perform the necessary splits or merges to balance them. The Kinesis Scaling Utility can also generate a report of shard key space sizes that can help you achieve the same result. See the following code:

{
    "Shards": [
        {
            "ShardId": "shardId-000000000000", 
            "HashKeyRange": {
                "EndingHashKey": "170141183460469231731687303715884105727", 
                "StartingHashKey": "0"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817078608863948980125442188478720910276534730754"
            }
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "HashKeyRange": {
                "EndingHashKey": "340282366920938463463374607431768211455", 
                "StartingHashKey": "170141183460469231731687303715884105728"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817100909609147510748583724196993558638040711186"
            }
        }
    ]
}

Monitoring your streams to preempt hot shards

As the hot shard scenario has demonstrated, monitoring your data streams at only the stream level does not prepare you for issues at the shard level. Kinesis offers a multitude of stream level and shard level metrics. At the shard level, IncomingBytes and IncomingRecords show you the ingestion rate into the shard. WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded indicate throttled Put and Get requests, respectively. At the stream level, keep an eye on PutRecord.Success, in which the average value reflects the percentage of PutRecord success over time. Paired with proper thresholds for alerting, they should help you take scaling actions proactively in response to flow changes in and out of your streams, and reduce the possibility of developing hot shards.

The following image shows a snapshot of a CloudWatch dashboard with several metrics of a Kinesis data stream.

Conclusion

This post discussed how to simplify the scaling and monitoring of your Kinesis Streams. It’s important to spend some time considering the expected data flow rate for your stream to find the appropriate capacity. Choosing a good partition key strategy helps you take full advantage of the capacity you provision and avoid hot shards. Monitoring your stream metrics and setting alarm thresholds helps you gain the visibility you need to make better scaling decisions.

For more information, see What Is Amazon Kinesis Data Streams? For more information about Kinesis API actions, see Actions.

 


About the Author

 Ahmed Gaafar is a senior technical account manager at AWS.

New AWS Lambda controls for stream processing and asynchronous invocations

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/

Today AWS Lambda is introducing new controls for asynchronous and stream processing invocations. These new features allow you to customize responses to Lambda function errors and build more resilient event-driven and stream-processing applications.

Stream processing function invocations

When processing data from event sources such as Amazon Kinesis Data Streams, and Amazon DynamoDB Streams, Lambda reads records in batches via shards. A shard is a uniquely identified sequence of data records. Your function is then invoked to process records from the batch “in order.” If an error is returned, Lambda retries the batch until processing succeeds or the data expires. This retry behavior is desirable in many cases, but not all:

  1. Until the issue is resolved, no data in the shard is processed. A single malformed record can prevent processing on an entire shard since “in order” guarantee ensures that failed batches are retried until the data record expires. This is often referred to as a “poison pill” event in that it prevents the rest of the system from processing data.
  2. In some cases, it may not be helpful to retry if subsequent invocations will also fail.
  3. If the function is not idempotent, retrying might produce unexpected side effects, and may result in duplicate outputs (for example, multiple database entries or business transactions).
Stream processing function invocationsFigure 1: Default stream invocation retry logs.

The new Lambda controls for stream processing invocations

With the new customizable controls, users are able to control how function errors and retries impact stream processing function invocations. A new event source-mapping configuration subresource (shown below), allows developers to clearly specify which controls will apply specifically to invocations.

DestinationConfig: {
    OnFailure: {
       Destination: SNS/SQS arn (String)
    }
}
Figure 2: Stream processing destination configuration.
{
    "MaximumRetryAttempts": integer,
    "BisectBatchOnFunctionError" : boolean,
    "MaximumRecordAgeInSeconds" : integer
}
Figure 3: Stream processing failure configuration.

MaximumRetryAttempts

Set a maximum number of retry attempts for batches before they can be skipped to unblock processing and prevent duplicate outputs.

Minimum: 0 | maximum: 10,000 | default: 10,000

MaximumRecordAgeInSeconds

Define a record maximum age in seconds, with expired records skipped to allow processing to continue. Data records that do not get successfully processed within the defined age of submission are skipped

Minimum: 60 | default/maximum: 604,800

BisectBatchOnFunctionError

This gives the option to recursively split the failed batch and retry on a smaller subset of records, eventually isolating the metadata causing the error.

Default: false

On-failure Destination

When either MaximumRetryAttempts or MaximumRecordAgeInSeconds reaches the specified value, a record will be skipped. If ‘Destination’ is set, metadata about the skipped records can be sent to a target ARN (i.e. Amazon SQS or Amazon SNS). If no target is configured, then the record will be dropped.

Getting started with error handling for stream processing invocations

Here you will see how to use the new controls to create a customized error handling configuration for stream processing invocations. You will set the maximum retry attempts to 1, the maximum record age to 60s, and send the metadata of skipped record to an Amazon Simple Queue Service (SQS) queue. First create a Kinesis Stream to put records into, and create an SQS queue to hold the metadata of retry exhausted or expired data records.

  1. Go to the Lambda console and choose Create function.
  2. Ensure that Author from scratch is selected, and enter a function a name such as “customStreamErrorExample.”
  3. Select Runtime as Node.js.12.x.
  4. Choose Create function.
    To enable Kinesis as an event trigger and to send metadata of a failed invocation to the SQS standard queue, you must give the Lambda execution role the required permissions.
  5. Scroll down to the Execution Role section and select the view link below the Existing role drop-down.
    Existing role selection
  6. Choose Add inline policy > JSON, then paste the following into the text box replacing {yourSQSarn} with the ARN of the SQS queue you created earlier and replacing {yourKinesisarn} with the ARN of the stream you created earlier. Choose Review policy.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "sqs:SendMessage",
                    "kinesis:GetShardIterator",
                    "kinesis:GetRecords",
                    "kinesis:DescribeStream"
                ],
                "Resource": [
                    "{yourSQSarn}",
                    "{yourKinesisarn}"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": "kinesis:ListStreams",
                "Resource": "*"
            }
        ]
    }
    
  7. Give the policy a name such as CustomSQSKinesis and choose Create policy.
  8. Navigate back to your Lambda function and choose Add trigger.
  9. Select Kinesis from the drop-down. Select your stream in the Kinesis stream dropdown.
  10. To see the new control options, choose the drop-down arrow on the Additional settings section.
  11. Paste the ARN of the previously created SQS queue (refer to this link to find the ARN).Additional settings
  12. Set the Maximum retry attempts to 1. Set the Maximum record age to 60. Choose Add.
    The Kinesis trigger has now been configured and the Lambda function has the required permissions to write to SQS and CloudWatch Logs.Success messageKinesis trigger added
  13. Choose the Lambda icon in the Designer section. Scroll down to the Function code section and paste the following code:
    exports.handler = async (event) => {
        // TODO implement
        console.log(event)
        const response = {
            statusCode: 200,
            body: event,dfh
        };
        return response;
    };

    This code has a syntax error in order to force a failure response.

    Next, you will trigger the Lambda function by adding a record to the Kinesis Stream via the AWS CLI. See how to install the AWS CLI if you have not previously done so.

  14. In your preferred terminal run the following command, replacing {YourStreamName} with the name of the stream you have created.
    aws kinesis put-record --stream-name {YourStreamName} --partition-key 123 --data testdata

    You should see a response similar to this (your sequence number will be different):
    Terminal response

  15. In the AWS Lambda console, choose Monitoring > View logs in CloudWatch and select the most recent log group.CloudWatch Logs
  16. As you can see the Lambda function failed as expected, but this time with only a single retry attempt. This is because the max retry attempt value was set to 1 and the record is younger than the max record age that was set to 60.
  17. In the AWS Management Console navigate to your SQS queue, Services > SQS.
  18. Select your queue and choose Queue Actions > View/Delete Messages > Start Polling for Messages.View/delete messages

Here you will see the metadata of your failed invocation. It has successfully been sent to the SQS destination for further investigation or processing.

Asynchronous function invocations

When a function is asynchronously invoked, Lambda sends the event to a queue before it is processed by your function. Invocations that result in an exception in the function code are retried twice with a delay of one minute before the first retry and two minutes before the second. Some invocations may never run due to a throttle or service fault: these are retried with exponential backoff until the invocation is six hours old.

Default asynchronous invocation retry logs

This retry behaviour shown above, works well in situations where every invocation must run at least once. However, in situations with a large volume of errors subsequent invocations are increasingly delayed as the backlog increases, as each new error places another retry event back into the event queue. If it were possible to skip the event without retrying, it would eliminate this delay and continue processing new events.

In order to avoid this default auto-retry policy, there are several widely used approaches:

  • Function error handling with AWS Step Functions
  • Using the event handler for error routing logic.
  • Third party monitoring services for exception alerts

These workarounds require extra resources, code, or custom logic and add latency to your applications. Retries caused by system failures due to timeouts, lack of memory, early exits, etc. could still occur.

New Lambda controls for asynchronous event invocations

New asynchronous event invoke controls mean that functions can now skip the processing of certain events and discard unwanted or backlogged requests from the asynchronous event queue without the need for “workarounds.” The controls will be accessible from the console and from a new Event Invoke Config subresource, listed in detail below:

{
    "MaximumEventAgeInSeconds": integer,
    "MaximumRetryAttempts": integer
}
Figure 4: Asynchronous event invoke configuration.

MaximumRetryAttempts

Set a maximum number of retry attempts for events before they can be skipped to unblock processing and prevent duplicate outputs.

Minimum: 0 | maximum: 2 | default: 2

MaximumEventAgeInSeconds

Define an event maximum age in seconds, with expired events skipped to allow processing to continue. Events that do not get successfully processed within defined age of submission are written to the function’s configured Dead Letter Queue and/or On-failure Destinations for asynchronous invocations. If none is configured, the event is discarded.

Minimum: 60 | maximum: 21600 (6 hours)

These controls can also be configured from the AWS Lambda console, in the “Failure handling configuration” section shown below:

Edit failure handling configurations

When the same Lambda function is run with maximum retry attempts set to 0, the Amazon CloudWatch Logs show that the function is only invoked a single time, and not retried after the initial error.

CloudWatch Logs

Conclusion

The addition of these new Lambda controls allows you to handle function failures and retries appropriately for both asynchronous and stream processing. This eliminates the need for custom-built logic, alerts, and added overhead.

Developers understand the needs of their own applications. These new features allow them to decide how to react to errors on a per-function basis. Whether that means real-time stream processing, non-blocking event queues, or more retries, it depends on the application’s requirements. With the new controls in place, the power is in your hands.

You can get started with the new controls for stream processing and asynchronous invocations via the AWS Management Console, AWS CLI, AWS SAM, AWS CloudFormation, or AWS SDK for Lambda.

New AWS Lambda scaling controls for Kinesis and DynamoDB event sources

Post Syndicated from Moheeb Zara original https://aws.amazon.com/blogs/compute/new-aws-lambda-scaling-controls-for-kinesis-and-dynamodb-event-sources/

AWS Lambda is introducing a new scaling parameter for Amazon Kinesis Data Streams and Amazon DynamoDB Streams event sources. Parallelization Factor can be set to increase concurrent Lambda invocations for each shard, which by default is 1. This allows for faster stream processing without the need to over-scale the number of shards, while still guaranteeing order of records processed.

There are two common optimization scenarios: high traffic and low traffic. For example, an online business might experience seasonal spikes in traffic. The following features help ensure that your business can scale appropriately to withstand the upcoming holiday season.

Handling high traffic with Parallelization Factor

A diagram showing how Parallelization Factor maintains order.

Each shard has uniquely identified sequences of data records. Each record contains a partition key to guarantee order and are organized separately into shards based on that key. The records from each shard must be polled to guarantee that records with the same partition key are processed in order.

When there is a high volume of data traffic, you want to process records as fast as possible. Before this release, customers were solving this by updating the number of shards on a Kinesis data stream. Increasing the number of shards increases the number of functions processing data from those shards. One Lambda function invocation processes one shard at a time.

You can now use the new Parallelization Factor to specify the number of concurrent batches that Lambda polls from a single shard. This feature introduces more flexibility in scaling options for Lambda and Kinesis. The default factor of one exhibits normal behavior. A factor of two allows up to 200 concurrent invocations on 100 Kinesis data shards. The Parallelization Factor can be scaled up to 10.

Each parallelized shard contains messages with the same partition key. This means record processing order will still be maintained and each parallelized shard must complete before processing the next.

Using Parallelization Factor

Since Parallelization Factor is quickly set on an event source mapping, it can be increased or decreased on demand. Fully automated scaling of stream processing is now possible.

For example, Amazon CloudWatch can be used to monitor changes in traffic. High traffic can cause the IteratorAge metric to increase, and an alarm can be created if this occurs for some specified period of time. The alarm can trigger a Lambda function that uses the UpdateEventSourceMapping API to increase the Parallelization Factor. In the same way, an alarm can be set to reduce the factor if traffic decreases.

You can enable Parallelization Factor in the AWS Lambda console by creating or updating a Kinesis or DynamoDB event source. Choose Additional settings and set the Concurrent batches per shard to the desired factor, between 1 and 10.

Configuring the Parallelization Factor from the AWS Lambda console.

Configuring the Parallelization Factor from the AWS Lambda console.

You can also enable this feature from the AWS CLI using the –-parallelization-factor parameter when creating or updating an event source mapping.

$ aws lambda create-event-source-mapping --function-name my-function \
--parallelization-factor 2 --batch-size 100 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
{
	"UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
	“ParallelizationFactor”: 2,
	"BatchSize": 100,
	"MaximumBatchingWindowInSeconds": 0,
	"EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
	"FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function",
	"LastModified": 1541139209.351,
	"LastProcessingResult": "No records processed",
	"State": "Creating",
	"StateTransitionReason": "User action"
}

Handling low traffic with Batch Window

Previously, you could use Batch Size to handle low volumes, or handle tasks that were not time sensitive. Batch Size configures the number of records to read from a shard, up to 10,000. The payload limit of a single invocation is 6 MB.

In September, we launched Batch Window, which allows you to fine tune when Lambda invocations occur. Lambda normally reads records from a Kinesis data stream at a particular interval. This feature is ideal in situations where data is sparse and batches of data take time to build up.

Using Batch Window, you can set your function to wait up to 300 seconds for a batch to build before processing it. This means you can also set your function to process on certain conditions, such as reaching the payload size, or Batch Size reaching its maximum value. With Batch Window, you can manage the average number of records processed by the function with each invocation. This allows you to increase the efficiency of each invocation and reduce the total number.

Batch Window is set when adding a new event trigger in the AWS Lambda console.

Adding an event source trigger in the AWS Lambda console

Adding an event source trigger in the AWS Lambda console

It can also be set using AWS CLI with the --maximum-batching-window-in-seconds parameter.

$ aws lambda create-event-source-mapping --function-name my-function \
--maximum-batching-window-in-seconds 300 --batch-size 100 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
{
	"UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
	"BatchSize": 100,
	"MaximumBatchingWindowInSeconds": 300,
	"EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
	"FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function",
	"LastModified": 1541139209.351,
	"LastProcessingResult": "No records processed",
	"State": "Creating",
	"StateTransitionReason": "User action"
}

Conclusion

You now have new options for managing scale in Amazon Kinesis and Amazon DynamoDB stream processing.  The Batch Window parameter allows you to tune how long to wait before processing a batch, ideal for low traffic or tasks that aren’t time sensitive. The Parallelization Factor parameter enables faster stream processing of ordered records at high volume, using concurrent Lambda invocations per shard. Both of these features can lead to more efficient stream processing.

New – AWS IoT Greengrass Adds Container Support and Management of Data Streams at the Edge

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-aws-iot-greengrass-adds-docker-support-and-streams-management-at-the-edge/

AWS IoT Greengrass extends cloud capabilities to edge devices, so that they can respond to local events in near real-time, even with intermittent connectivity.

Today, we are adding two features that make it easier to build IoT solutions:

  • Container support to deploy applications using the Greengrass Docker application deployment connector.
  • Collect, process, and export data streams from edge devices and manage the lifecycle of that data with the Stream Manager for AWS IoT Greengrass.

Let’s see how these new features work and how to use them.

Deploying a Container-Based Application to a Greengrass Core Device
You can now run AWS Lambda functions and container-based applications in your AWS IoT Greengrass core device. In this way it is easier to migrate applications from on-premises, or build new applications that include dependencies such as libraries, other binaries, and configuration files, using container images. This provides a consistent deployment environment for your applications that enables portability across development environments and edge locations. You can easily deploy legacy and third-party applications by packaging the code or executables into the container images.

To use this feature, I describe my container-based application using a Docker Compose file. I can reference container images in public or private repositories, such as Amazon Elastic Container Registry (ECR) or Docker Hub. To start, I create a simple web app using Python and Flask that counts the number of times it is visualized.

from flask import Flask

app = Flask(__name__)

counter = 0

@app.route('/')
def hello():
    global counter
    counter += 1
    return 'Hello World! I have been seen {} times.\n'.format(counter)

My requirements.txt file contains a single dependency, flask.

I build the container image using this Dockerfile and push it to ECR.

FROM python:3.7-alpine
WORKDIR /code
ENV FLASK_APP app.py
ENV FLASK_RUN_HOST 0.0.0.0
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
CMD ["flask", "run"]

Here is the docker-compose.yml file referencing the container image in my ECR repository. Docker Compose files can describe applications using multiple containers, but for this example I am using just one.

version: '3'
services:
  web:
    image: "123412341234.dkr.ecr.us-east-1.amazonaws.com/hello-world-counter:latest"
    ports:
      - "80:5000"

I upload the docker-compose.yml file to an Amazon Simple Storage Service (S3) bucket.

Now I create an AWS IoT Greengrass group using an Amazon Elastic Compute Cloud (EC2) instance as core device. Usually your core device is outside of the AWS cloud, but using an EC2 instance can be a good way to set up and automate a dev & test environment for your deployments at the edge.

When the group is ready, I run an “empty” deployment, just to check that everything is working as expected. After a few seconds, my first deployment has completed and I start adding a connector.

In the connector section of the AWS IoT Greengrass group, I select Add a connector and search for “Docker”. I select Docker Application Deployment and hit Next.

Now I configure the parameters for the connector. I select my docker-compose.yml file on S3. The AWS Identity and Access Management (IAM) role used by the AWS IoT Greengrass group needs permissions to get the file from S3 and to get the authorization token and download the image from ECR. If you use a private repository such as Docker Hub, you can leverage the integration with the AWS Secret Manager to make it easy for your connectors and Lambda functions to use local secrets to interact with services and applications.

I deploy my changes, similarly to what I did before. This time, the new container-based application is installed and started on the AWS IoT Greengrass core device.

To test the web app that I deployed, I open access to the HTTP port on the Security Group of the EC2 instance I am using as core device. When I connect with my browser, I see the Flask app starting to count the visits. My container-based application is running on the AWS IoT Greengrass core device!

You can deploy much more complex applications than what I did in this example. Let’s see that as we go through the other feature released today.

Using the Stream Manager for AWS IoT Greengrass
For common use cases like video processing, image recognition, or high-volume data collection from sensors at the edge, you often need to build your own data stream management capabilities. The new Stream Manager simplifies this process by adding a standardized mechanism to the Greengrass Core SDK that you can use to process data streams from IoT devices, manage local data retention policies based on cache size or data age, and automatically transmit data directly into AWS cloud services such as Amazon Kinesis and AWS IoT Analytics.

The Stream Manager also handles disconnected or intermittent connectivity scenarios by adding configurable prioritization, caching policies, bandwidth utilization, and time-outs on a per-stream basis. In situations where connectivity is unpredictable or bandwidth is constrained, this new functionality enables you to define the behavior of your applications’ data management while disconnected, reconnecting, or connected, allowing you to prioritize important data’s path to the cloud and make efficient use of a connection when it is available. Using this feature, you can focus on your specific application use cases rather than building data retention and connection management functionality.

Let’s see now how the Stream Manager works with a practical use case. For example, my AWS IoT Greengrass core device is receiving lots of data from multiple devices. I want to do two things with the data I am collecting:

  • Upload all row data with low priority to AWS IoT Analytics, where I use Amazon QuickSight to visualize and understand my data.
  • Aggregate data locally based on time and location of the devices, and send the aggregated data with high priority to a Kinesis Data Stream that is processed by a business application for predictive maintenance.

Using the Stream Manager in the Greengrass Core SDK, I create two local data streams:

  • The first local data stream has a configured low-priority export to IoT Analytics and can use up to 256MB of local disk (yes, it’s a constrained device). You can use memory to store the local data stream if you prefer speed to resilience. When local space is filled up, for example because I lost connectivity to the cloud and I continue to cache locally, I can choose to either reject new data or overwrite the oldest data.
  • The second local data stream is exporting data with high priority to a Kinesis Data Stream and can use up to 128MB of local disk (it’s aggregated data, I need less space for the same amount of time).

 

Here’s how the data flows in this architecture:

  • Sensor data is collected by a Producer Lambda function that is writing to the first local data stream.
  • A second Aggregator Lambda function is reading from the first local data stream, performing the aggregation, and writing its output to the second local data stream.
  • A Reader container-based app (deployed using the Docker application deployment connector) is rendering the aggregated data in real-time for a display panel.
  • The Stream Manager takes care of the ingestion to the cloud, based on the configuration and the policies of the local data streams, so that developers can focus their efforts on the logic on the device.

The use of Lambda functions or container-based apps in the previous architecture is just an example. You can mix and match, or standardize to one or the other, depending on your development best practices.

Available Now
The Docker application deployment connector and the Stream Manager are available with Greengrass version 1.10. The Stream Manager is available in the Greengrass Core SDK for Java and Python. We are adding support for other platforms based on customer feedback.

These new features are independent from each other, but can be used together as in my example. They can simplify the way you build and deploy applications on edge devices, making it easier to process data locally and be integrated with streaming and analytics services in the backend. Let me know what you are going to use these features for!

Danilo

New Issue of Architecture Monthly: Games

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/new-issue-of-architecture-monthly-games/

Architecture Monthyl Magazine - September 2019 (Games)This month’s Architecture Monthly magazine is all about games—not Scrabble, not Uno, not Twister, and certainly not hide-and-seek.

No, we’re talking the big business of online, multiplayer games. And did you know that approximately 90% of large, public game companies are running on the AWS cloud? Yep, I’m talking Epic (ever heard of Fortnite?), Ubisoft, Nintendo, and more. I had the opportunity to sit down with a senior tech leader for AWS Games, who talked about why companies are moving to the cloud from on-premise, and it’s about a whole lot more than just games for entertainment. We got into the big-money world of competitive eSports as well as the gamification of learning processes and economics.

Consider Twitch, often defined as Amazon’s live streaming platform for gamers. But Twitch is much more than a gaming platform. For example, AWS Live Video on Twitch offers live streaming about everything from how to develop serverless apps and robots to interactive quiz shows that help you prepare for AWS Certification exams. And of course, you can also learn about the technology that powers your favorite video games.

September’s Issue

For September’s issue, we’ve assembled architectural best practices about games from all over AWS, and we’ve made sure that a broad audience can appreciate it.

How to Access the Magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle page or contact us anytime at [email protected].

Visualizing Sensor Data in Amazon QuickSight

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/visualizing-sensor-data-in-amazon-quicksight/

This post is courtesy of Moheeb Zara, Developer Advocate, AWS Serverless

The Internet of Things (IoT) is a term used wherever physical devices are networked in some meaningful connected way. Often, this takes the form of sensor data collection and analysis. As the number of devices and size of data scales, it can become costly and difficult to keep up with demand.

Using AWS Serverless Application Model (AWS SAM), you can reduce the cost and time to market of an IoT solution. This guide demonstrates how to collect and visualize data from a low-cost, Wi-Fi connected IoT device using a variety of AWS services. Much of this can be accomplished within the AWS Free Usage Tier, which is necessary for the following instructions.

Services used

The following services are used in this example:

What’s covered in this post?

This post covers:

  • Connecting an Arduino MKR 1010 Wi-Fi device to AWS IoT Core.
  • Forwarding messages from an AWS IoT Core topic stream to a Lambda function.
  • Using a Kinesis Data Firehose delivery stream to store data in S3.
  • Analyzing and visualizing data stored in S3 using Amazon QuickSight.

Connect the device to AWS IoT Core using MQTT

The Arduino MKR 1010 is a low-cost, Wi-Fi enabled, IoT device, shown in the following image.

An Arduino MKR 1010 Wi-Fi microcontroller

Its analog and digital input and output pins can be used to read sensors or to write to actuators. Arduino provides a detailed guide on how to securely connect this device to AWS IoT Core. The following steps build upon it to push arbitrary sensor data to a topic stream and ultimately visualize that data using Amazon QuickSight.

  1. Start by following this comprehensive guide to using an Arduino MKR 1010 with AWS IoT Core. Upon completion, your device is connected to AWS IoT Core using MQTT (Message Queuing Telemetry Transport), a protocol for publishing and subscribing to messages using topics.
  2. In the Arduino IDE, choose File, Sketch, Include Library, and Manage Libraries.
  3. In the window that opens, search for ArduinoJson and select the library by Benoit Blanchon. Choose install.

4. Add #include <ArduinoJson.h> to the top of your sketch from the Arduino guide.

5. Modify the publishMessage() function with this code. It publishes a JSON message with two keys: time (ms) and the current value read from the first analog pin.

void publishMessage() {  
  Serial.println("Publishing message");

  // send message, the Print interface can be used to set the message contents
  mqttClient.beginMessage("arduino/outgoing");
  
  // create json message to send
  StaticJsonDocument<200> doc;
  doc["time"] = millis();
  doc["sensor_a0"] = analogRead(0);
  serializeJson(doc, mqttClient); // print to client
  
  mqttClient.endMessage();
}

6. Save and upload the sketch to your board.

Create a Kinesis Firehose delivery stream

Amazon Kinesis Data Firehose is a service that reliably loads streaming data into data stores, data lakes, and analytics tools. Amazon QuickSight requires a data store to create visualizations of the sensor data. This simple Kinesis Data Firehose delivery stream continuously uploads data to an S3 storage bucket. The next sections cover how to add records to this stream using a Lambda function.

  1. In the Kinesis Data Firehose console, create a new delivery stream, called SensorDataStream.
  2. Leave the default source as a Direct PUT or other sources and choose Next.
  3. On the next screen, leave all the default values and choose Next.
  4. Select Amazon S3 as the destination and create a new bucket with a unique name. This is where records are continuously uploaded so that they can be used by Amazon QuickSight.
  5. On the next screen, choose Create New IAM Role, Allow. This gives the Firehose delivery stream permission to upload to S3.
  6. Review and then choose Create Delivery Stream.

It can take some time to fully create the stream. In the meantime, continue on to the next section.

Invoking Lambda using AWS IoT Core rules

Using AWS IoT Core rules, you can forward messages from devices to a Lambda function, which can perform actions such as uploading to an Amazon DynamoDB table or an S3 bucket, or running data against various Amazon Machine Learning services. In this case, the function transforms and adds a message to the Kinesis Data Firehose delivery stream, which then adds that data to S3.

AWS IoT Core rules use the MQTT topic stream to trigger interactions with other AWS services. An AWS IoT Core rule is created by using an SQL statement, a topic filter, and a rule action. The Arduino example publishes messages every five seconds on the topic arduino/outgoing. The following instructions show how to consume those messages with a Lambda function.

Create a Lambda function

Before creating an AWS IoT Core rule, you need a Lambda function to consume forwarded messages.

  1. In the AWS Lambda console, choose Create function.
  2. Name the function ArduinoConsumeMessage.
  3. For Runtime, choose Author From Scratch, Node.js10.x. For Execution role, choose Create a new role with basic Lambda permissions. Choose Create.
  4. On the Execution role card, choose View the ArduinoConsumeMessage-role-xxxx on the IAM console.
  5. Choose Attach Policies. Then, search for and select AmazonKinesisFirehoseFullAccess.
  6. Choose Attach Policy. This applies the necessary permissions to add records to the Firehose delivery stream.
  7. In the Lambda console, in the Designer card, select the function name.
  8. Paste the following in the code editor, replacing SensorDataStream with the name of your own Firehose delivery stream. Choose Save.
const AWS = require('aws-sdk')

const firehose = new AWS.Firehose()
const StreamName = "SensorDataStream"

exports.handler = async (event) => {
    
    console.log('Received IoT event:', JSON.stringify(event, null, 2))
    
    let payload = {
        time: new Date(event.time),
        sensor_value: event.sensor_a0
    }
    
    let params = {
            DeliveryStreamName: StreamName,
            Record: { 
                Data: JSON.stringify(payload)
            }
        }
        
    return await firehose.putRecord(params).promise()

}

Create an AWS IoT Core rule

To create an AWS IoT Core rule, follow these steps.

  1. In the AWS IoT console, choose Act.
  2. Choose Create.
  3. For Rule query statement, copy and paste SELECT * FROM 'arduino/outgoing’. This subscribes to the outgoing message topic used in the Arduino example.
  4. Choose Add action, Send a message to a Lambda function, Configure action.
  5. Select the function created in the last set of instructions.
  6. Choose Create rule.

At this stage, any message published to the arduino/outgoing topic forwards to the ArduinoConsumeMessage Lambda function, which transforms and puts the payload on the Kinesis Data Firehose stream and also logs the message to Amazon CloudWatch. If you’ve connected an Arduino device to AWS IoT Core, it publishes to that topic every five seconds.

The following steps show how to test functionality using the AWS IoT console.

  1. In the AWS IoT console, choose Test.
  2. For Publish, enter the topic arduino/outgoing .
  3. Enter the following test payload:
    {
      “time”: 1567023375013,  
      “sensor_a0”: 456
    }
  4. Choose Publish to topic.
  5. Navigate back to your Lambda function.
  6. Choose Monitoring, View logs in CloudWatch.
  7. Select a log item to view the message contents, as shown in the following screenshot.

Visualizing data with Amazon QuickSight

To visualize data with Amazon QuickSight, follow these steps.

  1. In the Amazon QuickSight console, sign up.
  2. Choose Manage Data, New Data Set. Select S3 as the data source.
  3. A manifest file is necessary for Amazon QuickSight to be able to fetch data from your S3 bucket. Copy the following into a file named manifest.json. Replace YOUR-BUCKET-NAME with the name of the bucket created for the Firehose delivery stream.
    {
       "fileLocations":[
          {
             "URIPrefixes":[
                "s3://YOUR-BUCKET-NAME/"
             ]
          }
       ],
       "globalUploadSettings":{
          "format":"JSON"
       }
    }
  4. Upload the manifest.json file.
  5. Choose Connect, then Visualize. You may have to give Amazon QuickSight explicit permissions to your S3 bucket.
  6. Finally, design the Amazon QuickSight visualizations in the drag and drop editor. Drag the two available fields into the center card to generate a Sum of Sensor_value by Time visual.

Conclusion

This post demonstrated visualizing data from a securely connected remote IoT device. This was achieved by connecting an Arduino to AWS IoT Core using MQTT, forwarding messages from the topic stream to Lambda using IoT Core rules, putting records on an Amazon Kinesis Data Firehose delivery stream, and using Amazon QuickSight to visualize the data stored within an S3 bucket.

With these building blocks, it is possible to implement highly scalable and customizable IoT data collection, analysis, and visualization. With the use of other AWS services, you can build a full end-to-end platform for an IoT product that can reliably handle volume. To further explore how hardware and AWS Serverless can work together, visit the Amazon Web Services page on Hackster.

One to Many: Evolving VPC Design

Post Syndicated from Androski Spicer original https://aws.amazon.com/blogs/architecture/one-to-many-evolving-vpc-design/

Since its inception, the Amazon Virtual Private Cloud (VPC) has acted as the embodiment of security and privacy for customers who are looking to run their applications in a controlled, private, secure, and isolated environment.

This logically isolated space has evolved, and in its evolution has increased the avenues that customers can take to create and manage multi-tenant environments with multiple integration points for access to resources on-premises.

This blog is a two-part series that begins with a look at the Amazon VPC as a single unit of networking in the AWS Cloud but eventually takes you to a world in which simplified architectures for establishing a global network of VPCs are possible.

From One VPC: Single Unit of Networking

To be successful with the AWS Virtual Private Cloud you first have to define success for today and what success might look like as your organization’s adoption of the AWS cloud increases and matures. In essence, your VPCs should be designed to satisfy the needs of your applications today and must be scalable to accommodate future needs.

Classless Inter-Domain Routing (CIDR) notations are used to denote the size of your VPC. AWS allows you specify a CIDR block between /16 and /28. The largest, /16, provides you with 65,536 IP addresses and the smallest possible allowed CIDR block, /28, provides you with 16 IP addresses. Note, the first four IP addresses and the last IP address in each subnet CIDR block are not available for you to use, and cannot be assigned to an instance.

AWS VPC supports both IPv4 and IPv6. It is required that you specify an IPv4 CIDR range when creating a VPC. Specifying an IPv6 range is optional.

Customers can specify ANY IPv4 address space for their VPC. This includes but is not limited to RFC 1918 addresses.

After creating your VPC, you divide it into subnets. In an AWS VPC, subnets are not isolation boundaries around your application. Rather, they are containers for routing policies.

Isolation is achieved by attaching an AWS Security Group (SG) to the EC2 instances that host your application. SGs are stateful firewalls, meaning that connections are tracked to ensure return traffic is allowed. They control inbound and outbound access to the elastic network interfaces that are attached to an EC2 instance. These should be tightly configured, only allowing access as needed.

It is our best practice that subnets should be created in categories. There two main categories; public subnets and private subnets. At minimum they should be designed as outlined in the below diagrams for IPv4 and IPv6 subnet design.

Recommended IPv4 subnet design pattern

Recommended IPv6 subnet design pattern

Subnet types are denoted by the ability and inability for applications and users on the internet to directly initiate access to infrastructure within a subnet.

Public Subnets

Public subnets are attached to a route table that has a default route to the Internet via an Internet gateway.

Resources in a public subnet can have a public IP or Elastic IP (EIP) that has a NAT to the Elastic Network Interface (ENI) of the virtual machines or containers that hosts your application(s). This is a one-to-one NAT that is performed by the Internet gateway.

Illustration of public subnet access path to the Internet through the Internet Gateway (IGW)

Private Subnets

A private subnet contains infrastructure that isn’t directly accessible from the Internet. Unlike the public subnet, this infrastructure only has private IPs.

Infrastructure in a private subnet gain access to resources or users on the Internet through a NAT infrastructure of sorts.

AWS natively provides NAT capability through the use of the NAT Gateway service. Customers can also create NAT instances that they manage or leverage third-party NAT appliances from the AWS Marketplace.

In most scenarios, it is recommended to use the AWS NAT Gateway as it is highly available (in a single Availability Zone) and is provided as a managed service by AWS. It supports 5 Gbps of bandwidth per NAT gateway and automatically scales up to 45 Gbps.

An AWS NAT gateway’s high availability is confined to a single Availability Zone. For high availability across AZs, it is recommended to have a minimum of two NAT gateways (in different AZs). This allows you to switch to an available NAT gateway in the event that one should become unavailable.

This approach allows you to zone your Internet traffic, reducing cross Availability Zone connections to the Internet. More details on NAT gateway are available here.

Illustration of an environment with a single NAT Gateway (NAT-GW)

Illustration of high availability with a multiple NAT Gateways (NAT-GW) attached to their own route table

Illustration of the failure of one NAT Gateway and the fail over to an available NAT Gateway by the manual changing of the default route next hop in private subnet A route table

AWS allocated IPv6 addresses are Global Unicast Addresses by default. That said, you can privatize these subnets by using an Egress-Only Internet Gateway (E-IGW), instead of a regular Internet gateway. E-IGWs are purposely built to prevents users and applications on the Internet from initiating access to infrastructure in your IPv6 subnet(s).

Illustration of internet access for hybrid IPv6 subnets through an Egress-Only Internet Gateway (E-IGW)

Applications hosted on instances living within a private subnet can have different access needs. Some require access to the Internet while others require access to databases, applications, and users that are on-premises. For this type of access, AWS provides two avenues: the Virtual Gateway and the Transit Gateway. The Virtual Gateway can only support a single VPC at a time, while the Transit Gateway is built to simplify the interconnectivity of tens to hundreds of VPCs and then aggregating their connectivity to resources on-premises. Given that we are looking at the VPC as a single unit of networking, all diagrams below contain illustrations of the Virtual Gateway which acts a WAN concentrator for your VPC.

Illustration of private subnets connecting to data center via a Virtual Gateway (VGW)

 

Illustration of private subnets connecting to Data Center via a VGW

 

Illustration of private subnets connecting to Data Center using AWS Direct Connect as primary and IPsec as backup

The above diagram illustrates a WAN connection between a VGW attached to a VPC and a customer’s data center.

AWS provides two options for establishing a private connectivity between your VPC and on-premises network: AWS Direct Connect and AWS Site-to-Site VPN.

AWS Site-to-Site VPN configuration leverages IPSec with each connection providing two redundant IPSec tunnels. AWS support both static routing and dynamic routing (through the use of BGP).

BGP is recommended, as it allows dynamic route advertisement, high availability through failure detection, and fail over between tunnels in addition to decreased management complexity.

VPC Endpoints: Gateway & Interface Endpoints

Applications running inside your subnet(s) may need to connect to AWS public services (like Amazon S3, Amazon Simple Notification Service (SNS), Amazon Simple Queue Service (SQS), Amazon API Gateway, etc.) or applications in another VPC that lives in another account. For example, you may have a database in another account that you would like to expose applications that lives in a completely different account and subnet.

For these scenarios you have the option to leverage an Amazon VPC Endpoint.

There are two types of VPC Endpoints: Gateway Endpoints and Interface Endpoints.

Gateway Endpoints only support Amazon S3 and Amazon DynamoDB. Upon creation, a gateway is added to your specified route table(s) and acts as the destination for all requests to the service it is created for.

Interface Endpoints differ significantly and can only be created for services that are powered by AWS PrivateLink.

Upon creation, AWS creates an interface endpoint consisting of one or more Elastic Network Interfaces (ENIs). Each AZ can support one interface endpoint ENI. This acts as a point of entry for all traffic destined to a specific PrivateLink service.

When an interface endpoint is created, associated DNS entries are created that point to the endpoint and each ENI that the endpoint contains. To access the PrivateLink service you must send your request to one of these hostnames.

As illustrated below, ensure the Private DNS feature is enabled for AWS public and Marketplace services:

Since interface endpoints leverage ENIs, customers can use cloud techniques they are already familiar with. The interface endpoint can be configured with a restrictive security group. These endpoints can also be easily accessed from both inside and outside the VPC. Access from outside a VPC can be accomplished through Direct Connect and VPN.

Illustration of a solution that leverages an interface and gateway endpoint

Customers can also create AWS Endpoint services for their applications or services running on-premises. This allows access to these services via an interface endpoint which can be extended to other VPCs (even if the VPCs themselves do not have Direct Connect configured).

VPC Sharing

At re:Invent 2018, AWS launched the feature VPC sharing, which helps customers control VPC sprawl by decoupling the boundary of an AWS account from the underlying VPC network that supports its infrastructure.

VPC sharing uses Amazon Resource Access Manager (RAM) to share subnets across accounts within the same AWS organization.

VPC sharing is defined as:

VPC sharing allows customers to centralize the management of network, its IP space and the access paths to resources external to the VPC. This method of centralization and reuse (of VPC components such as NAT Gateway and Direct Connect connections) results in a reduction of cost to manage and maintain this environment.

Great, but there are times when a customer needs to build networks with multiple VPCs in and across AWS regions. How should this be done and what are the best practices?

This will be answered in part two of this blog.

 

 

ICYMI: Serverless Q2 2019

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-serverless-q2-2019/

This post is courtesy of Moheeb Zara, Senior Developer Advocate – AWS Serverless

Welcome to the sixth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

April - June 2019

Amazon EventBridge

Before we dive in to all that happened in Q2, we’re excited about this quarter’s launch of Amazon EventBridge, the serverless event bus that connects application data from your own apps, SaaS, and AWS-as-a-service. This allows you to create powerful event-driven serverless applications using a variety of event sources.

Our very own AWS Solutions Architect, Mike Deck, sat down with AWS Serverless Hero Jeremy Daly and recorded a podcast on Amazon EventBridge. It’s a worthy listen if you’re interested in exploring all the features offered by this launch.

Now, back to Q2, here’s what’s new.

AWS Lambda

Lambda Monitoring

Amazon CloudWatch Logs Insights now allows you to see statistics from recent invocations of your Lambda functions in the Lambda monitoring tab.

Additionally, as of June, you can monitor the [email protected] functions associated with your Amazon CloudFront distributions directly from your Amazon CloudFront console. This includes a revamped monitoring dashboard for CloudFront distributions and [email protected] functions.

AWS Step Functions

Step Functions

AWS Step Functions now supports workflow execution events, which help in the building and monitoring of even-driven serverless workflows. Automatic Execution event notifications can be delivered upon start/completion of CloudWatch Events/Amazon EventBridge. This allows services such as AWS Lambda, Amazon SNS, Amazon Kinesis, or AWS Step Functions to respond to these events.

Additionally you can use callback patterns to automate workflows for applications with human activities and custom integrations with third-party services. You create callback patterns in minutes with less code to write and maintain, run without servers and infrastructure to manage, and scale reliably.

Amazon API Gateway

API Gateway Tag Based Control

Amazon API Gateway now offers tag-based access control for WebSocket APIs using AWS Identity and Access Management (IAM) policies, allowing you to categorize API Gateway resources for WebSocket APIs by purpose, owner, or other criteria.  With the addition of tag-based access control to WebSocket resources, you can now give permissions to WebSocket resources at various levels by creating policies based on tags. For example, you can grant full access to admins to while limiting access to developers.

You can now enforce a minimum Transport Layer Security (TLS) version and cipher suites through a security policy for connecting to your Amazon API Gateway custom domain.

In addition, Amazon API Gateway now allows you to define VPC Endpoint policies, enabling you to specify which Private APIs a VPC Endpoint can connect to. This enables granular security control using VPC Endpoint policies.

AWS Amplify

Amplify CLI (part of the open source Amplify Framework) now includes support for adding and configuring AWS Lambda triggers for events when using Amazon Cognito, Amazon Simple Storage Service, and Amazon DynamoDB as event sources. This means you can setup custom authentication flows for mobile and web applications via the Amplify CLI and Amazon Cognito User Pool as an authentication provider.

Amplify Console

Amplify Console,  a Git-based workflow for continuous deployment and hosting for fullstack serverless web apps, launched several updates to the build service including SAM CLI and custom container support.

Amazon Kinesis

Amazon Kinesis Data Firehose can now utilize AWS PrivateLink to securely ingest data. AWS PrivateLink provides private connectivity between VPCs, AWS services, and on-premises applications, securely over the Amazon network. When AWS PrivateLink is used with Amazon Kinesis Data Firehose, all traffic to a Kinesis Data Firehose from a VPC flows over a private connection.

You can now assign AWS resource tags to applications in Amazon Kinesis Data Analytics. These key/value tags can be used to organize and identify resources, create cost allocation reports, and control access to resources within Amazon Kinesis Data Analytics.

Amazon Kinesis Data Firehose is now available in the AWS GovCloud (US-East), Europe (Stockholm), Asia Pacific (Seoul), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), and EU (London) regions.

For a complete list of where Amazon Kinesis Data Analytics is available, please see the AWS Region Table.

AWS Cloud9

Cloud9 Quick Starts

Amazon Web Services (AWS) Cloud9 integrated development environment (IDE) now has a Quick Start which deploys in the AWS cloud in about 30 minutes. This enables organizations to provide developers a powerful cloud-based IDE that can edit, run, and debug code in the browser and allow easy sharing and collaboration.

AWS Cloud9 is also now available in the EU (Frankfurt) and Asia Pacific (Tokyo) regions. For a current list of supported regions, see AWS Regions and Endpoints in the AWS documentation.

Amazon DynamoDB

You can now tag Amazon DynamoDB tables when you create them. Tags are labels you can attach to AWS resources to make them easier to manage, search, and filter.  Tagging support has also been extended to the AWS GovCloud (US) Regions.

DynamoDBMapper now supports Amazon DynamoDB transactional API calls. This support is included within the AWS SDK for Java. These transactional APIs provide developers atomic, consistent, isolated, and durable (ACID) operations to help ensure data correctness.

Amazon DynamoDB now applies adaptive capacity in real time in response to changing application traffic patterns, which helps you maintain uninterrupted performance indefinitely, even for imbalanced workloads.

AWS Training and Certification has launched Amazon DynamoDB: Building NoSQL Database–Driven Applications, a new self-paced, digital course available exclusively on edX.

Amazon Aurora

Amazon Aurora Serverless MySQL 5.6 can now be accessed using the built-in Data API enabling you to access Aurora Serverless with web services-based applications, including AWS LambdaAWS AppSync, and AWS Cloud9. For more check out this post.

Sharing snapshots of Aurora Serverless DB clusters with other AWS accounts or publicly is now possible. We are also giving you the ability to copy Aurora Serverless DB cluster snapshots across AWS regions.

You can now set the minimum capacity of your Aurora Serverless DB clusters to 1 Aurora Capacity Unit (ACU). With Aurora Serverless, you specify the minimum and maximum ACUs for your Aurora Serverless DB cluster instead of provisioning and managing database instances. Each ACU is a combination of processing and memory capacity. By setting the minimum capacity to 1 ACU, you can keep your Aurora Serverless DB cluster running at a lower cost.

AWS Serverless Application Repository

The AWS Serverless Application Repository is now available in 17 regions with the addition of the AWS GovCloud (US-West) region.

Region support includes Asia Pacific (Mumbai, Singapore, Sydney, Tokyo), Canada (Central), EU (Frankfurt, Ireland, London, Paris, Stockholm), South America (São Paulo), US West (N. California, Oregon), and US East (N. Virginia, Ohio).

Amazon Cognito

Amazon Cognito has launched a new API – AdminSetUserPassword – for the Cognito User Pool service that provides a way for administrators to set temporary or permanent passwords for their end users. This functionality is available for end users even when their verified phone or email are unavailable.

Serverless Posts

April

May

June

Events

Events this quarter

Senior Developer Advocates for AWS Serverless spoke at several conferences this quarter. Here are some recordings worth watching!

Tech Talks

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. Here are the ones from Q2.

Twitch

Twitch Series

In April, we started a 13-week deep dive into building APIs on AWS as part of our Twitch Build On series. The Building Happy Little APIs series covers the common and not-so-common use cases for APIs on AWS and the features available to customers as they look to build secure, scalable, efficient, and flexible APIs.

There are also a number of other helpful video series covering Serverless available on the AWS Twitch Channel.

Build with Serverless on Twitch

Serverless expert and AWS Specialist Solutions architect, Heitor Lessa, has been hosting a weekly Twitch series since April. Join him and others as they build an end-to-end airline booking solution using serverless. The final episode airs on August 7th at Wednesday 8:00am PT.

Here’s a recap of the last quarter:

AWS re:Invent

AWS re:Invent 2019

AWS re:Invent 2019 is around the corner! From December 2 – 6 in Las Vegas, Nevada, join tens of thousands of AWS customers to learn, share ideas, and see exciting keynote announcements. Be sure to take a look at the growing catalog of serverless sessions this year.

Register for AWS re:Invent now!

What did we do at AWS re:Invent 2018? Check out our recap here: AWS re:Invent 2018 Recap at the San Francisco Loft

AWS Serverless Heroes

We urge you to explore the efforts of our AWS Serverless Heroes Community. This is a worldwide network of AWS Serverless experts with a diverse background of experience. For example, check out this post from last month where Marcia Villalba demonstrates how to set up unit tests for serverless applications.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

Understanding the Different Ways to Invoke Lambda Functions

Post Syndicated from George Mao original https://aws.amazon.com/blogs/architecture/understanding-the-different-ways-to-invoke-lambda-functions/

In our first post, we talked about general design patterns to enable massive scale with serverless applications. In this post, we’ll review the different ways you can invoke Lambda functions and what you should be aware of with each invocation model.

Synchronous Invokes

Synchronous invocations are the most straight forward way to invoke your Lambda functions. In this model, your functions execute immediately when you perform the Lambda Invoke API call. This can be accomplished through a variety of options, including using the CLI or any of the supported SDKs.

Here is an example of a synchronous invoke using the CLI:

aws lambda invoke —function-name MyLambdaFunction —invocation-type RequestResponse —payload  “[JSON string here]”

The Invocation-type flag specifies a value of “RequestResponse”. This instructs AWS to execute your Lambda function and wait for the function to complete. When you perform a synchronous invoke, you are responsible for checking the response and determining if there was an error and if you should retry the invoke.

Many AWS services can emit events that trigger Lambda functions. Here is a list of services that invoke Lambda functions synchronously:

Asynchronous Invokes

Here is an example of an asynchronous invoke using the CLI:

aws lambda invoke —function-name MyLambdaFunction —invocation-type Event —payload  “[JSON string here]”

Notice, the Invocation-type flag specifies “Event.” If your function returns an error, AWS will automatically retry the invoke twice, for a total of three invocations.

Here is a list of services that invoke Lambda functions asynchronously:

Asynchronous invokes place your invoke request in Lambda service queue and we process the requests as they arrive. You should use AWS X-Ray to review how long your request spent in the service queue by checking the “dwell time” segment.

Poll based Invokes

This invocation model is designed to allow you to integrate with AWS Stream and Queue based services with no code or server management. Lambda will poll the following services on your behalf, retrieve records, and invoke your functions. The following are supported services:

AWS will manage the poller on your behalf and perform Synchronous invokes of your function with this type of integration. The retry behavior for this model is based on data expiration in the data source. For example, Kinesis Data streams store records for 24 hours by default (up to 168 hours). The specific details of each integration are linked above.

Conclusion

In our next post, we’ll provide some tips and best practices for developing Lambda functions. Happy coding!

 

About the Author

George MaoGeorge Mao is a Specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. George is responsible for helping customers design and operate Serverless applications using services like Lambda, API Gateway, Cognito, and DynamoDB. He is a regular speaker at AWS Summits, re:Invent, and various tech events. George is a software engineer and enjoys contributing to open source projects, delivering technical presentations at technology events, and working with customers to design their applications in the Cloud. George holds a Bachelor of Computer Science and Masters of IT from Virginia Tech.

Increasing real-time stream processing performance with Amazon Kinesis Data Streams enhanced fan-out and AWS Lambda

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/

Live business data and real-time analytics are critical to informed decision-making and customer service. For example, streaming services like Netflix process billions of traffic flows each day to help you binge-watch your favorite shows. And consumer audio specialists like Sonos monitor a billion events per week to improve listener experiences. These data-savvy businesses collect and analyze massive amounts of real-time data every day.

Kinesis Data Streams overview

To help ingest real-time data or streaming data at large scales, AWS customers turn to Amazon Kinesis Data Streams. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources. The data collected is available in milliseconds, enabling real-time analytics.

To provide this massively scalable throughput, Kinesis Data Streams relies on shards, which are units of throughput and represent a parallelism. One shard provides an ingest throughput of 1 MB/second or 1000 records/second. A shard also has an outbound throughput of 2 MB/sec. As you ingest more data, Kinesis Data Streams can add more shards. Customers often ingest thousands of shards in a single stream.

Enhanced fan-out

One of the main advantages of stream processing is that you can attach multiple unique applications, each consuming data from the same Kinesis data stream. For example, one application can aggregate the records in the data stream, batch them, and write the batch to S3 for long-term retention. Another application can enrich the records and write them into an Amazon DynamoDB table. At the same time, a third application can filter the stream and write a subset of the data into a different Kinesis data stream.

Before the adoption of enhanced fan-out technology, users consumed data from a Kinesis data stream with multiple AWS Lambda functions sharing the same 2 MB/second outbound throughput. Due to shared bandwidth constraints, no more than two or three functions could efficiently connect to the data stream at a time, as shown in the following diagram.

Default Method

To achieve greater outbound throughput across multiple applications, you could spread data ingestion across multiple data streams. So, a developer seeking to achieve 10 GB/second of outbound throughput to support five separate applications might resort to math like the following table:

StreamShardsInputOutput
111000 records/second
or 1 MB/second
2 MB/second
22500 ea.5,000,000 records/second
or 5000 MB/second
10,000 MB/second
or 10 GB/second

Due to the practical limitation of two to three applications per stream, you must have at least two streams to support five individual applications. You could attach three applications to the first stream and two applications to the second. However, diverting data into two separate streams adds complexity.

In August of 2018, Kinesis Data Streams announced a solution: support for enhanced fan-out and HTTP/2 for faster streaming. The enhanced fan-out method is an option that you can use for consuming Kinesis data streams at a higher capacity. The enhanced capacity enables you to achieve higher outbound throughput without having to provision more streams or shards in the same stream.

When using the enhanced fan-out option, first create a Kinesis data stream consumer. A consumer is an isolated connection to the stream that provides a 2 MB/second outbound throughput. A Kinesis data stream can support up to five consumers, providing a combined outbound throughput capacity of 10MB/second/shard. As the stream scales dynamically by adding shards, so does the amount of throughput scale through the consumers.

Consider again the requirement of 10 GB of output capacity—but rerun your math using enhanced fan-out.

StreamShardsInputConsumersOutput
111000 records/second
or 1 MB/second
510 MB/second
110001,000,000 records
or 1,000 MB/second
510,000 MB/second
or 10 GB/second

Enhanced fan-out with Lambda functions

Just before re:Invent 2018, AWS Lambda announced support for enhanced fan-out and HTTP/2. Lambda functions can now be triggered using the enhanced fan-out pattern to reduce latency. These improvements increase the amount of real-time data that can be processed in serverless applications, as seen in the following diagram.

Enhanced Fan-Out Method

In addition to using the enhanced fan-out option, you can still attach Lambda functions to the stream using the GetRecords API, as before. You can attach up to five consumers with Lambda functions at 2 MB/second outbound throughput capacity and another two or three Lambda functions sharing a single 2 MB/second outbound throughput capacity. Thus, enhanced fan-out enables you to support up to eight Lambda functions, simultaneously.

HTTP/2

The streaming technology in HTTP/2 increases the output ability of Kinesis data streams. In addition, it allows data delivery from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios. These new features enable you to build faster, more reactive, highly parallel, and latency-sensitive applications on top of Kinesis Data Streams.

Comparing methods

To demonstrate the advantage of Kinesis Data Streams enhanced fan-out, I built an application with a single shard stream. It has three Lambda functions connected using the standard method and three Lambda functions connected using enhanced fan-out for consumers. I created roughly 76 KB of dummy data and inserted it into the stream at 1,000 records per second. After four seconds, I stopped the process, leaving a total of 4,000 records to be processed.

As seen in the following diagram, each of the enhanced fan-out functions processed the 4000 records in under 2 seconds, averaging at 1,852 MS each. Interestingly, the standard method got a jumpstart in the first function, processing 4,000 records in 1,732 MS. However, because of the shared resources, the other two functions took longer to process the data, at just over 2.5 seconds.

Comparison of Methods

By Kinesis Data Streams standards, 4000 records is a small dataset. But when processing millions of records in real time, the latency between standard and enhanced fan-out becomes much more significant.

Cost

When using Kinesis Data Streams, a company incurs an hourly cost of $0.015 per shard and a PUT fee of $0.014 per one million units. You can purchase enhanced fan-out for a consumer-shard per hour fee of $0.015 and $0.013 per GB data retrieval fee. These fees are for the us-east-1 Region only. To see a full list of prices, see Kinesis Data Streams pricing.

Show me the code

To demonstrate the use of Kinesis Data Streams enhanced fan-out with Lambda functions, I built a simple application. It ingests simulated IoT sensor data and stores it in an Amazon DynamoDB table as well as in an Amazon S3 bucket for later use. I could have conceivably done this in a single Lambda function. However, to keep things simple, I broke it into two separate functions.

Deploying the application

I built the Kinesis-Enhanced-Fan-Out-to-DDB-S3 application and made it available through the AWS Serverless Application Repository.

Deploy the application in your AWS account. The application is only available in the us-east-1 Region.

On the deployment status page, you can monitor the resources being deployed, including policies and capabilities.

Deployment Status

After all the resources deploy, you should see a green banner.

Exploring the application

Take a moment to examine the list of deployed resources. The two Lambda functions, DDBFunction and S3Function, receive data and write to DynamoDB and S3, respectively. Additionally, two roles have been created to allow the functions access to their respective targets.

There are also two consumers, DDBConsumer and S3Consumer, which provide isolated output at 2 MB/second throughput. Each consumer is connected to the KinesisStream stream and triggers the Lambda functions when data occurs.

Also, there is a DynamoDB table called DBRecords and an S3 bucket called S3Records.

Finally, there is a stream consumption app, as shown in the following diagram.

Application Example

Testing the application

Now that you have your application installed, test it by putting data into the Kinesis data stream.

There are several ways to do this. You can build your producer using the Kinesis Producer Library (KPL), or you could create an app that uses the AWS SDK to input data. However, there is an easier way that suits your purposes for this post: the Amazon Kinesis Data Generator. The easiest way to use this tool is to use the hosted generator and follow the setup instructions.

After you have the generator configured, you should have a custom URL to generate data for your Kinesis data stream. In your configuration steps, you created a username and password. Log in to the generator using those credentials.

When you are logged in, you can generate data for your stream test.

  1. For Region, choose us-east-1.
  2. For Stream/delivery stream, select your stream. It should start with serverlessrepo.
  3. For Records per second, keep the default value of 100.
  4. On the Template 1 tab, name the template Sensor1.
  5. Use the following template:
    {
        "sensorId": {{random.number(50)}},
        "currentTemperature": {{random.number(
            {
                "min":10,
                "max":150
            }
        )}},
        "status": "{{random.arrayElement(
            ["OK","FAIL","WARN"]
        )}}"
    }
  6. Choose Send Data.
  7. After several seconds, choose Stop Sending Data.

At this point, if all went according to plan, you should see data in both your DynamoDB table and S3 bucket. Use the following steps to verify that your enhanced fan-out process worked.

  1. On the Lambda console, choose Applications.
  2. Select the application that starts with serverlessrepo-.
  3. Choose Resources, DDBFunction. This opens the DynamoDB console.
  4. Choose Items.

The following screenshot shows the first 100 items that your database absorbed from the DDBFunction attached to KinesisStream through DDBConsumer.

DynamoDB Records

Next, check your S3 bucket,

  1. On the Lambda console, choose Applications.
  2. Select the application that starts with serverlessrepo-.
  3. Choose Resources, S3Records. This opens the S3 console.

As in DynamoDB, you should now see the fake IoT sensor data stored in your S3 bucket for later use.

S3 Records

Now that the demonstration is working, I want to point out the benefits of what you have just done. By using the enhanced fan-out method, you have increased your performance in the following ways.

  1. HTTP/2 has decreased the time from data producers to consumers to <=70 MS, a 65% improvement.
  2. At the consumer level, each consumer has an isolated 2 MB/second outbound throughput speed. Because you are using two consumers, it works out to 2x the performance.

Conclusion

Using Lambda functions in concert with Kinesis Data Streams to collect and analyze massive amounts of data isn’t a new idea. However, the introduction of enhanced fan-out technology and HTTP/2 enables you to use more functions at the same time without losing throughput capacity.

If you only connect one or two Lambda functions to a data stream, then enhanced fan-out might not be a great fit. However, if you attach more than three Lambda functions to a stream for real-time manipulation and data routing, it makes sense to evaluate this option.

I hope this helps. Happy coding!

How to Design Your Serverless Apps for Massive Scale

Post Syndicated from George Mao original https://aws.amazon.com/blogs/architecture/how-to-design-your-serverless-apps-for-massive-scale/

Serverless is one of the hottest design patterns in the cloud today, allowing you to focus on building and innovating, rather than worrying about the heavy lifting of server and OS operations. In this series of posts, we’ll discuss topics that you should consider when designing your serverless architectures. First, we’ll look at architectural patterns designed to achieve massive scale with serverless.

Scaling Considerations

In general, developers in a “serverful” world need to be worried about how many total requests can be served throughout the day, week, or month, and how quickly their system can scale. As you move into the serverless world, the most important question you should understand becomes: “What is the concurrency that your system is designed to handle?”

The AWS Serverless platform allows you to scale very quickly in response to demand. Below is an example of a serverless design that is fully synchronous throughout the application. During periods of extremely high demand, Amazon API Gateway and AWS Lambda will scale in response to your incoming load. This design places extremely high load on your backend relational database because Lambda can easily scale from thousands to tens of thousands of concurrent requests. In most cases, your relational databases are not designed to accept the same number of concurrent connections.

Serverless at scale-1

This design risks bottlenecks at your relational database and may cause service outages. This design also risks data loss due to throttling or database connection exhaustion.

Cloud Native Design

Instead, you should consider decoupling your architecture and moving to an asynchronous model. In this architecture, you use an intermediary service to buffer incoming requests, such as Amazon Kinesis or Amazon Simple Queue Service (SQS). You can configure Kinesis or SQS as out of the box event sources for Lambda. In design below, AWS will automatically poll your Kinesis stream or SQS resource for new records and deliver them to your Lambda functions. You can control the batch size per delivery and further place throttles on a per Lambda function basis.

Serverless at scale - 2

This design allows you to accept extremely high volume of requests, store the requests in a durable datastore, and process them at the speed which your system can handle.

Conclusion

Serverless computing allows you to scale much quicker than with server-based applications, but that means application architects should always consider the effects of scaling to your downstream services. Always keep in mind cost, speed, and reliability when you’re building your serverless applications.

Our next post in this series will discuss the different ways to invoke your Lambda functions and how to design your applications appropriately.

About the Author

George MaoGeorge Mao is a Specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. George is responsible for helping customers design and operate Serverless applications using services like Lambda, API Gateway, Cognito, and DynamoDB. He is a regular speaker at AWS Summits, re:Invent, and various tech events. George is a software engineer and enjoys contributing to open source projects, delivering technical presentations at technology events, and working with customers to design their applications in the Cloud. George holds a Bachelor of Computer Science and Masters of IT from Virginia Tech.

Optimizing downstream data processing with Amazon Kinesis Data Firehose and Amazon EMR running Apache Spark

Post Syndicated from Srikanth Kodali original https://aws.amazon.com/blogs/big-data/optimizing-downstream-data-processing-with-amazon-kinesis-data-firehose-and-amazon-emr-running-apache-spark/

For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge.  Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis.  Amazon S3 is a natural landing spot for data of all types.  However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing.  Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files.  Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Kinesis Data Firehose to merge many small messages into larger messages for delivery to Amazon S3.  This results in faster processing with Amazon EMR running Spark.

Like Amazon Kinesis Data Streams, Kinesis Data Firehose accepts a maximum incoming message size of 1 MB.  If a single message is greater than 1 MB, it can be compressed before placing it on the stream.  However, at large volumes, a message or file size of 1 MB or less is usually too small.  Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.

This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.

Solution overview

The steps we follow in this blog post are:

  1. Create a virtual private cloud (VPC) and an Amazon S3 bucket.
  2. Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
  3. Provision Kinesis Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
  4. Generate test data with custom code running on an Amazon EC2
  5. Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.

The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Kinesis Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.

Amazon Kinesis Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.

An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.

Considerations

The maximum size of a record sent to Kinesis Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Kinesis Data Firehose is the best approach. Kinesis Data Firehose  also offers compression of messages after they are written to the Kinesis Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Kinesis Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Kinesis Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.

We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.

Prerequisites and assumptions

To follow the steps outlined in this blog post, you need the following:

  • An AWS account that provides access to AWS services.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
  • The templates and code are intended to work in the US East (N. Virginia) Region only.

Additionally, be aware of the following:

  • We configure all services in the same VPC to simplify networking considerations.
  • Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.

Implementing the solution

You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.

This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.

For this parameterProvide this
StackNameProvide the stack name.

ClientIP

 

The IP address range of the client that is allowed to connect to the cluster using SSH.
FirehoseDeliveryStreamNameThe name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”.
InstanceTypeThe EC2 instance type.
KeyNameThe name of an existing EC2 key pair to enable access to login.
KinesisStreamNameThe name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream”
RegionAWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only.

EMRClusterName

 

A name for the EMR cluster.
S3BucketNameThe name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.

If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.

To create each component individually, use the following steps.

1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket

In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
S3BucketNameProvide a unique Amazon S3 bucket. This bucket is created in your account.
ClientIpProvide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url.

After you specify the template details, choose Next. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
StackNameName
VPCIDVpc-xxxxxxx
SubnetIDsubnet-xxxxxxxx
SecurityGroupsg-xxxxxxxxxx
S3BucketDomain<S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARNarn:aws:s3:::<S3_BUCKET_NAME>

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  Use the AWS CloudFormation template to create necessary IAM Roles

In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Kinesis Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances.  The second IAM role is used by the Amazon Kinesis Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
LambdaRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3. Use an AWS CloudFormation template to configure the Amazon Kinesis Data Firehose data stream

In this step, we set up Amazon Kinesis Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
FirehoseDeliveryStreamNameProvide the name of the Amazon Kinesis Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose”
RoleProvide the Kinesis Data Firehose IAM role ARN that was created as part of step 2.
S3BucketARNSelect the S3BucketARN. You can get this from the step 1 AWS CloudFormation output.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function

In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides details.

For this parameterDo this
StackNameProvide the stack name.
KinesisStreamNameProvide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’
RoleProvide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template.
S3BucketProvide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only.
RegionSelect the AWS Region. By default it is us-east-1 — US East (N. Virginia).

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

5. Use an AWS CloudFormation template to configure the Amazon EMR cluster

In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Kinesis Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EMRClusterNameProvide the name for the EMR cluster.
ClusterSecurityGroupSelect the security group ID that was created as part of the first AWS CloudFormation template.
ClusterSubnetIDSelect the subnet ID that was created as part of the first AWS CloudFormation template.
AllowedCIDRProvide the IP address range of the client that is allowed to connect to the cluster.
KeyNameProvide the name of an existing EC2 key pair to access the Amazon EMR cluster.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EMRClusterMasterssh [email protected] -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6. Use an AWS CloudFormation template to create an Amazon EC2 Instance to generate test data

In this step, we set up an Amazon EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.

You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EC2SecurityGroupSelect the security group ID that was created from the first AWS CloudFormation template.
EC2SubnetSelect the subnet that was created from the first AWS CloudFormation template.
InstanceTypeSelect the provided instance type. By default, it selects r4.4xlarge instance.
KeyNameName of an existing EC2 key pair to enable SSH access to the EC2 instance.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EC2Instancessh [email protected]<Public-IP> -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7. Generate the test dataset and load into Kinesis Data Streams

After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to Amazon Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.

Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.

Run the following commands to generate some test data.

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream. Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042729 KB”

When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.

After all of the messages generated and sent to Amazon Kinesis Data Streams, program will exit gracefully.

The sample JSON input message format is shown as follows:

   "processedDate":"2018/10/30 19:05:19",
   "currentDate":"2018/10/30 19:05:07",
   "hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
   "deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
   "accelerometerSensorList":[  
      {  
         "accelerometer_Y":8,
         "gravitySensor_X":5,
         "accelerometer_X":9,
         "gravitySensor_Z":4,
         "accelerometer_Z":1,
         "gravitySensor_Y":5,
         "linearAccelerationSensor_Z":3,
         "linearAccelerationSensor_Y":9,
         "linearAccelerationSensor_X":9
      },
      {  
         "accelerometer_Y":1,
         "gravitySensor_X":3,
         "accelerometer_X":5,
         "gravitySensor_Z":5,
         "accelerometer_Z":7,
         "gravitySensor_Y":9,
         "linearAccelerationSensor_Z":6,
         "linearAccelerationSensor_Y":5,
         "linearAccelerationSensor_X":3
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "tempSensorList":[  
      {  
         "kelvin":585.4928040286752,
         "celsius":43.329574923775425,
         "fahrenheit":50.13864584530086
      },
      {  
         "kelvin":349.95625855125814,
         "celsius":95.68423052685313,
         "fahrenheit":7.854854574219985
      },
 {
   …
 },
 {
   …
 },
 :
 :
 
   ],
   "illuminancesSensorList":[  
      {  
         "illuminance":44.65135784368194
      },
      {  
         "illuminance":98.15404017082403
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "gpsSensorList":[  
      {  
         "altitude":4.38273213294682,
         "heading":7.416314616289915,
         "latitude":5.759723677991661,
         "longitude":1.4732885894731842
      },
      {  
         "altitude":9.816473807569487,
         "heading":5.118919157684835,
         "latitude":3.581361614110458,
         "longitude":1.3699272610616127
      },
 {
   …
 },
 {
   …
 },
 :
 :
   }

 

Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4.  Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.

8. Processing Kinesis Data Streams messages using AWS Lambda

As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB.  Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to Amazon Kinesis Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Kinesis Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.

We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Kinesis Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided Amazon S3 bucket.

The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.

"testAdditonalDataList": [
  {
    "dimesnion_X": 9,
    "dimesnion_Y": 2,
    "dimesnion_Z": 2
  },
  {
    "dimesnion_X": 3,
    "dimesnion_Y": 10,
    "dimesnion_Z": 5
  }
  {
    …
  },
  {
    …
  },
  :
  :
]

If we do not compress the message before sending to Kinesis Data Firehose, it throws this error message in the Amazon CloudWatch Logs.

Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.

private String sendToFireHose(String mergedJsonString)
{
    PutRecordResult res = null;
    try {
        //To Firehose -
        System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
        System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
        PutRecordRequest req = new PutRecordRequest()
                .withDeliveryStreamName(firehoseStreamName);

        // Without compression - Send to Firehose
        //Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));

        // With compression - send to Firehose
        Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
        req.setRecord(record);
        res = kinesisFirehoseClient.putRecord(req);
    }
    catch (IOException ie) {
        ie.printStackTrace();
    }
    return res.getRecordId();
}

You can check the provided bucket to see if the messages are flowing into the bucket. The Amazon S3 bucket should show something similar to the following example:

You see the files generated from Kinesis Data Firehose that do not have any extension. By default, Kinesis Data Firehose does not provide any extension to the files that are generated in Amazon S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Kinesis Data Firehose. As the message is already compressed, we are not selecting any compression option in Kinesis Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.

9. Reading and converting the data into parquet format using Apache Spark program with Amazon EMR

As we noted down from the previous screen shot, Kinesis Data Firehose by default does not generate any file extensions to the files that are written into Amazon S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.

To overcome this issue, we can use Amazon S3 API operations, particularly AmazonS3Client class, to list all the Amazon S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
  (
   new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent:   InputStream)
  ).getLines 
}

var finalDF = spark.read.json(allLinesRDD).toDF()

Once the Amazon EMR cluster creation is completed successfully, login to the Amazon EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.

  • ssh [email protected] -i <KEYPAIR_NAME>.pem
  • Make sure the security port 22 is opened to connect to the Amazon EMR master machine.

Run the Spark program using the following Spark submit command.

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.

Argument #PropertyValue
1–classcom.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2–masteryarn
3–deploy-modeclient
4s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5S3_BUCKET_NAMEThe Amazon S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket.
6<INPUT S3 LOCATION>“fromfirehose/<YYYY>/”. The input files are created in this Amazon S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/”
7<OUTPUT S3 LOCATION>Provide an output directory name that will be created under the above provided Amazon S3 bucket. For example: “output-emr-parquet/”

 

When the program finishes running, you can check the Amazon S3 output location to see the files that are written in parquet format.

Cleaning up after the migration

After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created Amazon S3 bucket. Make sure that you cleaned up the Amazon S3 bucket that was created before deleting the AWS CloudFormation templates.

Conclusion

In this post, we described the process of avoiding small file creation in Amazon S3 by sending the incoming messages to Amazon Kinesis Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an Amazon EMR cluster.

 


About the Author

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.

Conclusion

As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.

 


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

 

 

 

 

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights

Conclusion

In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.

 


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.

 

 

 

 

Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.

 


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences

 

 

 

 

Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences