All posts by Steffen Hausmann

Streaming ETL with Apache Flink and Amazon Kinesis Data Analytics

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics/

Most businesses generate data continuously in real time and at ever-increasing volumes. Data is generated as users play mobile games, load balancers log requests, customers shop on your website, and temperature changes on IoT sensors. You can capitalize on time-sensitive events, improve customer experiences, increase efficiency, and drive innovation by analyzing this data quickly. The speed at which you get these insights often depends on how quickly you can load data into data lakes, data stores, and other analytics tools. As data volume and velocity increases, it becomes more important to not only load the incoming data, but also to transform and analyze it in near-real time.

This post looks at how to use Apache Flink as a basis for sophisticated streaming extract-transform-load (ETL) pipelines. Apache Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Kinesis Data Analytics, which enables you to build and run sophisticated streaming applications quickly, easily, and with low operational overhead.

This post discusses the concepts that are required to implement powerful and flexible streaming ETL pipelines with Apache Flink and Kinesis Data Analytics. It also looks at code examples for different sources and sinks. For more information, see the GitHub repo. The repo also contains an AWS CloudFormation template so you can get started in minutes and explore the example streaming ETL pipeline.

Architecture for streaming ETL with Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. It supports a wide range of highly customizable connectors, including connectors for Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, and Amazon Simple Storage Service (Amazon S3). Moreover, Apache Flink provides a powerful API to transform, aggregate, and enrich events, and supports exactly-once semantics. Apache Flink is therefore a good foundation for the core of your streaming architecture.

To deploy and run the streaming ETL pipeline, the architecture relies on Kinesis Data Analytics. Kinesis Data Analytics enables you to run Flink applications in a fully managed environment. The service provisions and manages the required infrastructure, scales the Flink application in response to changing traffic patterns, and automatically recovers from infrastructure and application failures. You can combine the expressive Flink API for processing streaming data with the advantages of a managed service by using Kinesis Data Analytics to deploy and run Flink applications. It allows you to build robust streaming ETL pipelines and reduces the operational overhead of provisioning and operating infrastructure.

The architecture in this post takes advantage of several capabilities that you can achieve when you run Apache Flink with Kinesis Data Analytics. Specifically, the architecture supports the following:

  • Private network connectivity – Connect to resources in your Amazon Virtual Private Cloud (Amazon VPC), in your data center with a VPN connection, or in a remote region with a VPC peering connection
  • Multiple sources and sinks – Read and write data from Kinesis data streams, Apache Kafka clusters, and Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters
  • Data partitioning – Determine the partitioning of data that is ingested into Amazon S3 based on information extracted from the event payload
  • Multiple Elasticsearch indexes and custom document IDs – Fan out from a single input stream to different Elasticsearch indexes and explicitly control the document ID
  • Exactly-once semantics – Avoid duplicates when ingesting and delivering data between Apache Kafka, Amazon S3, and Amazon Elasticsearch Service (Amazon ES)

The following diagram illustrates this architecture.

The remainder of this post discusses how to implement streaming ETL architectures with Apache Flink and Kinesis Data Analytics. The architecture persists streaming data from one or multiple sources to different destinations and is extensible to your needs. This post does not cover additional filtering, enrichment, and aggregation transformations, although that is a natural extension for practical applications.

This post shows how to build, deploy, and operate the Flink application with Kinesis Data Analytics, without further focusing on these operational aspects. It is only relevant to know that you can create a Kinesis Data Analytics application by uploading the compiled Flink application jar file to Amazon S3 and specifying some additional configuration options with the service. You can then execute the Kinesis Data Analytics application in a fully managed environment. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics developer guide.

Exploring a streaming ETL pipeline in your AWS account

Before you consider the implementation details and operational aspects, you should get a first impression of the streaming ETL pipeline in action. To create the required resources, deploy the following AWS CloudFormation template:

The template creates a Kinesis data stream and an Amazon Elastic Compute Cloud (Amazon EC2) instance to replay a historic data set into the data stream. This post uses data based on the public dataset obtained from the New York City Taxi and Limousine Commission. Each event describes a taxi trip made in New York City and includes timestamps for the start and end of a trip, information on the boroughs the trip started and ended in, and various details on the fare of the trip. A Kinesis Data Analytics application then reads the events and persists them to Amazon S3 in Parquet format and partitioned by event time.

Connect to the instance by following the link next to ConnectToInstance in the output section of the CloudFromation template that you executed previously. You can then start replaying a set of taxi trips into the data stream with the following code:

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

You can obtain this command with the correct parameters from the output section of the AWS CloudFormation template. The output section also points you to the S3 bucket to which events are persisted and an Amazon CloudWatch dashboard that lets you monitor the pipeline.

For more information about enabling the remaining combinations of sources and sinks, for example, Apache Kafka and Elasticsearch, see the GitHub repo.

Building a streaming ETL pipeline with Apache Flink

Now that you have seen the pipeline in action, you can dive into the technical aspects of how to implement the functionality with Apache Flink and Kinesis Data Analytics.

Reading and writing to private resources

Kinesis Data Analytics applications can access resources on the public internet and resources in a private subnet that is part of your VPC. By default, a Kinesis Data Analytics application only enables access to resources that you can reach over the public internet. This works well for resources that provide a public endpoint, for example, Kinesis data streams or Amazon Elasticsearch Service.

If your resources are private to your VPC, either for technical or security-related reasons, you can configure VPC connectivity for your Kinesis Data Analytics application. For example, MSK clusters are private; you cannot access them from the public internet. You may run your own Apache Kafka cluster on premises that is not exposed to the public internet and is only accessible from your VPC through a VPN connection. The same is true for other resources that are private to your VPC, such as relational databases or AWS PrivateLink-powered endpoints.

To enable VPC connectivity, configure the Kinesis Data Analytics application to connect to private subnets in your VPC. Kinesis Data Analytics creates elastic network interfaces in one or more of the subnets provided in your VPC configuration for the application, depending on the parallelism of the application. For more information, see Configuring Kinesis Data Analytics for Java Applications to access Resources in an Amazon VPC.

The following screenshot shows an example configuration of a Kinesis Data Analytics application with VPC connectivity:

The application can then access resources that have network connectivity from the configured subnets. This includes resources that are not directly contained in these subnets, which you can reach over a VPN connection or through VPC peering. This configuration also supports endpoints that are available over the public internet if you have a NAT gateway configured for the respective subnets. For more information, see Internet and Service Access for a VPC-Connected Kinesis Data Analytics for Java application.

Configuring Kinesis and Kafka sources

Apache Flink supports various data sources, including Kinesis Data Streams and Apache Kafka. For more information, see Streaming Connectors on the Apache Flink website.

To connect to a Kinesis data stream, first configure the Region and a credentials provider. As a general best practice, choose AUTO as the credentials provider. The application will then use temporary credentials from the role of the respective Kinesis Data Analytics application to read events from the specified data stream. This avoids baking static credentials into the application. In this context, it is also reasonable to increase the time between two read operations from the data stream. When you increase the default of 200 milliseconds to 1 second, the latency increases slightly, but it facilitates multiple consumers reading from the same data stream. See the following code:

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

This config is passed to the FlinkKinesisConsumer with the stream name and a DeserializationSchema. This post uses the TripEventSchema for deserialization, which specifies how to deserialize a byte array that represents a Kinesis record into a TripEvent object. See the following code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

For more information, see TripEventSchema.java and TripEvent.java on GitHub. Apache Flink provides other more generic serializers that can deserialize data into strings or JSON objects.

Apache Flink is not limited to reading from Kinesis data streams. If you configure the Kinesis Data Analytics application’s VPC settings correctly, Apache Flink can also read events from Apache Kafka and MSK clusters. Specify a comma-separated list of broker and port pairs to use for the initial connection to your cluster. This config is passed to the FlinkKafkaConsumer with the topic name and a DeserializationSchema to create a source that reads from the respective topic of the Apache Kafka cluster. See the following code:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");

DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

The resulting DataStream contains TripEvent objects that have been deserialized from the data ingested into the data stream and Kafka topic, respectively. You can then use the data streams in combination with a sink to persist the events into their respective destination.

Persisting data in Amazon S3 with data partitioning

When you persist streaming data to Amazon S3, you may want to partition the data. You can substantially improve query performance of analytic tools by partitioning data because partitions that cannot contribute to a query can be pruned and therefore do not need to be read. For example, the right partitioning strategy can improve Amazon Athena query performance and cost by reducing the amount of data read for a query. You should choose to partition your data by the same attributes used in your application logic and query patterns. Furthermore, it is common when processing streaming data to include the time of an event, or event time, in your partitioning strategy. This contrasts with using the ingestion time or some other service-side timestamp that does not reflect the time an event occurred as accurately as event time.

For more information about taking data partitioned by ingestion time and repartitioning it by event time with Athena, see Analyze your Amazon CloudFront access logs at scale. However, you can directly partition the incoming data based on event time with Apache Flink by using the payload of events to determine the partitioning, which avoids an additional post-processing step. This capability is called data partitioning and is not limited to partition by time.

You can realize data partitioning with Apache Flink’s StreamingFileSink and BucketAssigner. For more information, see Streaming File Sink on the Apache Flink website.

When given a specific event, the BucketAssigner determines the corresponding partition prefix in the form of a string. See the following code:

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }

  ...
}

The sink takes an argument for the S3 bucket as a destination path and a function that converts the TripEvent Java objects into a string. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

You can further customize the size of the objects you write to Amazon S3 and the frequency of the object creation with a rolling policy. You can configure your policy to have more events aggregated into fewer objects at the cost of increased latency, or vice versa. This can help avoid many small objects on Amazon S3, which can be a desirable trade-off for increased latency. A high number of objects can negatively impact the query performance of consumers reading the data from Amazon S3. For more information, see DefaultRollingPolicy on the Apache Flink website.

The number of output files that arrive in your S3 bucket per your rolling policy also depends on the parallelism of the StreamingFileSink and how you distribute events between Flink application operators. In the previous example, the Flink internal DataStream is partitioned by pickup location ID with the keyBy operator. The location ID is also used in the BucketAssigner as part of the prefix for objects that are written to Amazon S3. Therefore, the same node aggregates and persists all events with the same prefix, which results in particularly large objects on Amazon S3.

Apache Flink uses multipart uploads under the hood when writing to Amazon S3 with the StreamingFileSink. In case of failures, Apache Flink may not be able to clean up incomplete multipart uploads. To avoid unnecessary storage fees, set up the automatic cleanup of incomplete multipart uploads by configuring appropriate lifecycle rules on the S3 bucket. For more information, see Important Considerations for S3 on the Apache Flink website and Example 8: Lifecycle Configuration to Abort Multipart Uploads.

Converting output to Apache Parquet

In addition to partitioning data before delivery to Amazon S3, you may want to compress it with a columnar storage format. Apache Parquet is a popular columnar format, which is well supported in the AWS ecosystem. It reduces the storage footprint and can substantially increase query performance and reduce cost.

The StreamingFileSink supports Apache Parquet and other bulk-encoded formats through a built-in BulkWriter factory. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forBulkFormat(
    new Path("s3://<bucket name>"),
    ParquetAvroWriters.forSpecificRecord(TripEvent.class)
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

For more information, see Bulk-encoded Formats on the Apache Flink website.

Persisting events works a bit differently when you use the Parquet conversion. When you enable Parquet conversion, you can only configure the StreamingFileSink with the OnCheckpointRollingPolicy, which commits completed part files to Amazon S3 only when a checkpoint is triggered. You need to enable Apache Flink checkpoints in your Kinesis Data Analytics application to persist data to Amazon S3. It only becomes visible for consumers when a checkpoint is triggered, so your delivery latency depends on how often your application is checkpointing.

Moreover, you previously just needed to generate a string representation of the data to write to Amazon S3. In contrast, the ParquetAvroWriters expects an Apache Avro schema for events. For more information, see the GitHub repo. You can use and extend the schema on the repo if you are want an example.

In general, it is highly desirable to convert data into Parquet if you want to work with and query the persisted data effectively. Although it requires some additional effort, the benefits of the conversion outweigh these additional complexities compared to storing raw data.

Fanning out to multiple Elasticsearch indexes and custom document IDs

Amazon ES is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch clusters. A popular use case is to stream application and network log data into Amazon S3. These logs are documents in Elasticsearch parlance, and you can create one for every event and store it in an Elasticsearch index.

The Elasticsearch sink that Apache Flink provides is flexible and extensible. You can specify an index based on the payload of each event. This is useful when the stream contains different event types and you want to store the respective documents in different Elasticsearch indexes. With this capability, you can use a single sink and application, respectively, to write into multiple indexes. With newer Elasticsearch versions, a single index cannot contain multiple types. See the following code:

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());

    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);

events.addSink(sink);

You can also explicitly set the document ID when you send documents to Elasticsearch. If an event with the same ID is ingested into Elasticsearch multiple times, it is overwritten rather than creating duplicates. This enables your writes to Elasticsearch to be idempotent. In this way, you can obtain exactly-once semantics of the entire architecture, even if your data sources only provide at-least-once semantics.

The AmazonElasticsearchSink used above is an extension of the Elasticsearch sink that is comes with Apache Flink. The sink adds support to sign requests with IAM credentials so you can use the strong IAM-based authentication and authorization that is available from the service. To this end, the sink picks up temporary credentials from the Kinesis Data Analytics environment in which the application is running. It uses the Signature Version 4 method to add authentication information to the request that is sent to the Elasticsearch endpoint.

Leveraging exactly-once semantics

You can obtain exactly-once semantics by combining an idempotent sink with at-least-once semantics, but that is not always feasible. For instance, if you want to replicate data from one Apache Kafka cluster to another or persist transactional CDC data from Apache Kafka to Amazon S3, you may not be able to tolerate duplicates in the destination, but both of these sinks are not idempotent.

Apache Flink natively supports exactly-once semantics. Kinesis Data Analytics implicitly enables exactly-once mode for checkpoints. To obtain end-to-end exactly-once semantics, you need to enable checkpoints for the Kinesis Data Analytics application and choose a connector that supports exactly-once semantics, such as the StreamingFileSink. For more information, see Fault Tolerance Guarantees of Data Sources and Sinks on the Apache Flink website.

There are some side effects to using exactly-once semantics. For example, end-to-end latency increases for several reasons. First, you can only commit the output when a checkpoint is triggered. This is the same as the latency increases that occurred when you turned on Parquet conversion. The default checkpoint interval is 1 minute, which you can decrease. However, obtaining sub-second delivery latencies are difficult with this approach.

Also, the details of end-to-end exactly-once semantics are subtle. Although the Flink application may read in an exactly-once fashion from a data stream, duplicates may already be part of the stream, so you can only obtain at-least-once semantics of the entire application. For Apache Kafka as a source and sink, different caveats apply. For more information, see Caveats on the Apache Flink website.

Be sure that you understand all the details of the entire application stack before you take a hard dependency on exactly-once semantics. In general, if your application can tolerate at-least-once semantics, it’s a good idea to use that semantic instead of relying on stronger semantics that you don’t need.

Using multiple sources and sinks

One Flink application can read data from multiple sources and persist data to multiple destinations. This is interesting for several reasons. First, you can persist the data or different subsets of the data to different destinations. For example, you can use the same application to replicate all events from your on-premises Apache Kafka cluster to an MSK cluster. At the same time, you can deliver specific, valuable events to an Elasticsearch cluster.

Second, you can use multiple sinks to increase the robustness of your application. For example, your application that applies filters and enriches streaming data can also archive the raw data stream. If something goes wrong with your more complex application logics, Amazon S3 still has the raw data, which you can use to backfill the sink.

However, there are some trade-offs. When you bundle many functionalities in a single application, you increase the blast radius of failures. If a single component of the application fails, the entire application fails and you need to recover it from the last checkpoint. This causes some downtime and increased delivery latency to all delivery destinations in the application. Also, a single large application is often harder to maintain and to change. You should strike a balance between adding functionality or creating additional Kinesis Data Analytics applications.

Operational aspects

When you run the architecture in production, you set out to execute a single Flink application continuously and indefinitely. It is crucial to implement monitoring and proper alarming to make sure that the pipeline is working as expected and the processing can keep up with the incoming data. Ideally, the pipeline should adapt to changing throughput conditions and cause notifications if it fails to deliver data from the sources to the destinations.

Some aspects require specific attention from an operational perspective. The following section provides some ideas and further references on how you can increase the robustness of your streaming ETL pipelines.

Monitoring and scaling sources

The data stream and the MSK cluster, respectively, are the entry point to the entire architecture. They decouple the data producers from the rest of the architecture. To avoid any impact to data producers, which you often cannot control directly, you need to scale the input stream of the architecture appropriately and make sure that it can ingest messages at any time.

Kinesis Data Streams uses a throughput provisioning model based on shards. Each shard provides a certain read and write capacity. From the number of provisioned shards, you can derive the maximum throughput of the stream in terms of ingested and emitted events and data volume per second. For more information, see Kinesis Data Streams Quotas.

Kinesis Data Streams exposes metrics through CloudWatch that report on these characteristics and indicate whether the stream is over- or under-provisioned. You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively, or you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. Similar metrics exist for data egress, which you should also monitor. For more information, see Monitoring the Amazon Kinesis Data Streams with Amazon CloudWatch.

The following graph shows some of these metrics for the data stream of the example architecture. On average the Kinesis data stream receives 2.8 million events and 1.1 GB of data every minute.

You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively whereas you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. You can even automate the scaling of your Kinesis Data Streams. For more information, see Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount.

Apache Kafka and Amazon MSK use a node-based provisioning model. Amazon MSK also exposes metrics through CloudWatch, including metrics that indicate how much data and how many events are ingested into the cluster. For more information, see Amazon MSK Metrics for Monitoring with CloudWatch.

In addition, you can also enable open monitoring with Prometheus for MSK clusters. It is a bit harder to know the total capacity of the cluster, and you often need benchmarking to know when you should scale. For more information about important metrics to monitor, see Monitoring Kafka on the Confluent website.

Monitoring and scaling the Kinesis Data Analytics application

The Flink application is the central core of the architecture. Kinesis Data Analytics executes it in a managed environment, and you want to make sure that it continuously reads data from the sources and persists data in the data sinks without falling behind or getting stuck.

When the application falls behind, it often is an indicator that it is not scaled appropriately. Two important metrics to track the progress of the application are millisBehindLastest (when the application is reading from a Kinesis data stream) and records-lag-max (when it is reading from Apache Kafka and Amazon MSK). These metrics not only indicate that data is read from the sources, but they also tell if data is read fast enough. If the values of these metrics are continuously growing, the application is continuously falling behind, which may indicate that you need to scale up the Kinesis Data Analytics application. For more information, see Kinesis Data Streams Connector Metrics and Application Metrics.

The following graph shows the metrics for the example application in this post. During checkpointing, the maximum millisBehindLatest metric occasionally spikes up to 7 seconds. However, because the reported average of the metric is less than 1 second and the application immediately catches up to the tip of the stream again, it is not a concern for this architecture.

Although the lag of the application is one of the most important metrics to monitor, there are other relevant metrics that Apache Flink and Kinesis Data Analytics expose. For more information, see Monitoring Apache Flink Applications 101 on the Apache Flink website.

Monitoring sinks

To verify that sinks are receiving data and, depending on the sink type, do not run out of storage, you need to monitor sinks closely.

You can enable detailed metrics for your S3 buckets that track the number of requests and data uploaded into the bucket with 1-minute granularity. For more information, see Monitoring Metrics with Amazon CloudWatch. The following graph shows these metrics for the S3 bucket of the example architecture:

When the architecture persists data into a Kinesis data stream or a Kafka topic, it acts as a producer, so the same recommendations as for monitoring and scaling sources apply. For more information about operating and monitoring the service in production environments, see Amazon Elasticsearch Service Best Practices.

Handling errors

“Failures are a given and everything eventually fails over time”, so you should expect the application to fail at some point. For example, an underlying node of the infrastructure that Kinesis Data Analytics manages might fail, or intermittent timeouts on the network can prevent the application from reading from sources or writing to sinks. When this happens, Kinesis Data Analytics restarts the application and resumes processing by recovering from the latest checkpoint. Because the raw events have been persisted in a data stream or Kafka topic, the application can reread the events that have been persisted in the stream between the last checkpoint and when it recovered and continue standard processing.

These kinds of failures are rare and the application can gracefully recover without sacrificing processing semantics, including exactly-once semantics. However, other failure modes need additional attention and mitigation.

When an exception is thrown anywhere in the application code, for example, in the component that contains the logic for parsing events, the entire application crashes. As before, the application eventually recovers, but if the exception is from a bug in your code that a specific event always hits, it results in an infinite loop. After recovering from the failure, the application rereads the event, because it was not processed successfully before, and crashes again. The process starts again and repeats indefinitely, which effectively blocks the application from making any progress.

Therefore, you want to catch and handle exceptions in the application code to avoid crashing the application. If there is a persistent problem that you cannot resolve programmatically, you can use side outputs to redirect the problematic raw events to a secondary data stream, which you can persist to a dead letter queue or an S3 bucket for later inspection. For more information, see Side Outputs on the Apache Flink website.

When the application is stuck and cannot make any progress, it is at least visible in the metrics for application lag. If your streaming ETL pipeline filters or enriches events, failures may be much more subtle, and you may only notice them long after they have been ingested. For instance, due to a bug in the application, you may accidentally drop important events or corrupt their payload in unintended ways. Kinesis data streams stores events for up to 7 days and, though technically possible, Apache Kafka is often not configured to store events indefinitely either. If you don’t identify the corruption quickly enough, you risk losing information when the retention of the raw events expires.

To protect against this scenario, you can persist the raw events to Amazon S3 before you apply any additional transformations or processing to them. You can keep the raw events and reprocess or replay them into the stream if you need to. To integrate the functionality into the application, add a second sink that just writes to Amazon S3. Alternatively, use a separate application that only reads and persists the raw events from the stream, at the cost of running and paying for an additional application.

When to choose what

AWS provides many services that work with streaming data and can perform streaming ETL. Amazon Kinesis Data Firehose can ingest, process, and persist streaming data into a range of supported destinations. There is a significant overlap of the functionality between Kinesis Data Firehose and the solution in this post, but there are different reasons to use one or the other.

As a rule of thumb, use Kinesis Data Firehose whenever it fits your requirements. The service is built with simplicity and ease of use in mind. To use Kinesis Data Firehose, you just need to configure the service. You can use Kinesis Data Firehose for streaming ETL use cases with no code, no servers, and no ongoing administration. Moreover, Kinesis Data Firehose comes with many built-in capabilities, and its pricing model allows you to only pay for the data processed and delivered. If you don’t ingest data into Kinesis Data Firehose, you pay nothing for the service.

In contrast, the solution in this post requires you to create, build, and deploy a Flink application. Moreover, you need to think about monitoring and how to obtain a robust architecture that is not only tolerant against infrastructure failures but also resilient against application failures and bugs. However, this added complexity unlocks many advanced capabilities, which your use case may require. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

What’s next?

This post discussed how to build a streaming ETL pipeline with Apache Flink and Kinesis Data Analytics. It focused on how to build an extensible solution that addresses some advanced use cases for streaming ingest while maintaining low operational overhead. The solution allows you to quickly enrich, transform, and load your streaming data into your data lake, data store, or another analytical tool without the need for an additional ETL step. The post also explored ways to extend the application with monitoring and error handling.

You should now have a good understanding of how to build streaming ETL pipelines on AWS. You can start capitalizing on your time-sensitive events by using a streaming ETL pipeline that makes valuable information quickly accessible to consumers. You can tailor the format and shape of this information to your use case without adding the substantial latency of traditional batch-based ETL processes.

 


About the Author

Steffen Hausmann is a Specialist Solutions Architect for Analytics at AWS. He works with customers around the globe to design and build streaming architectures so that they can get value from analyzing their streaming data. He holds a doctorate degree in computer science from the University of Munich and in his free time, he tries to lure his daughters into tech with cute stickers he collects at conferences. You can follow his ruthless attempts on Twitter (@sthmmm).

 

 

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights

Conclusion

In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.

 


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.