All posts by Lorenzo Nicora

Amazon Managed Service for Apache Flink now supports Apache Flink version 1.18

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/amazon-managed-service-for-apache-flink-now-supports-apache-flink-version-1-18/

Apache Flink is an open source distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink, which offers a fully managed, serverless experience in running Apache Flink applications, now supports Apache Flink 1.18.1, the latest version of Apache Flink at the time of writing.

In this post, we discuss some of the interesting new features and capabilities of Apache Flink, introduced with the most recent major releases, 1.16, 1.17, and 1.18, and now supported in Managed Service for Apache Flink.

New connectors

Before we dive into the new functionalities of Apache Flink available with version 1.18.1, let’s explore the new capabilities that come from the availability of many new open source connectors.

OpenSearch

A dedicated OpenSearch connector is now available to be included in your projects, enabling an Apache Flink application to write data directly into OpenSearch, without relying on Elasticsearch compatibility mode. This connector is compatible with Amazon OpenSearch Service provisioned and OpenSearch Service Serverless.

This new connector supports SQL and Table APIs, working with both Java and Python, and the DataStream API, for Java only. Out of the box, it provides at-least-once guarantees, synchronizing the writes with Flink checkpointing. You can achieve exactly-once semantics using deterministic IDs and upsert method.

By default, the connector uses OpenSearch version 1.x client libraries. You can switch to version 2.x by adding the correct dependencies.

Amazon DynamoDB

Apache Flink developers can now use a dedicated connector to write data into Amazon DynamoDB. This connector is based on the Apache Flink AsyncSink, developed by AWS and now an integral part of the Apache Flink project, to simplify the implementation of efficient sink connectors, using non-blocking write requests and adaptive batching.

This connector also supports both SQL and Table APIs, Java and Python, and DataStream API, for Java only. By default, the sink writes in batches to optimize throughput. A notable feature of the SQL version is support for the PARTITIONED BY clause. By specifying one or more keys, you can achieve some client-side deduplication, only sending the latest record per key with each batch write. An equivalent can be achieved with the DataStream API by specifying a list of partition keys for overwriting within each batch.

This connector only works as a sink. You cannot use it for reading from DynamoDB. To look up data in DynamoDB, you still need to implement a lookup using the Flink Async I/O API or implementing a custom user-defined function (UDF), for SQL.

MongoDB

Another interesting connector is for MongoDB. In this case, both source and sink are available, for both the SQL and Table APIs and DataStream API. The new connector is now officially part of the Apache Flink project and supported by the community. This new connector replaces the old one provided by MongoDB directly, which only supports older Flink Sink and Source APIs.

As for other data store connectors, the source can either be used as a bounded source, in batch mode, or for lookups. The sink works both in batch mode and streaming, supporting both upsert and append mode.

Among the many notable features of this connector, one that’s worth mentioning is the ability to enable caching when using the source for lookups. Out of the box, the sink supports at-least-once guarantees. When a primary key is defined, the sink can support exactly-once semantics via idempotent upserts. The sink connector also supports exactly-once semantics, with idempotent upserts, when the primary key is defined.

New connector versioning

Not a new feature, but an important factor to consider when updating an older Apache Flink application, is the new connector versioning. Starting from Apache Flink version 1.17, most connectors have been externalized from the main Apache Flink distribution and follow independent versioning.

To include the right dependency, you need to specify the artifact version with the form: <connector-version>-<flink-version>

For example, the latest Kafka connector, also working with Amazon Managed Streaming for Apache Kafka (Amazon MSK), at the time of writing is version 3.1.0. If you are using Apache Flink 1.18, the dependency to use will be the following:

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

For Amazon Kinesis, the new connector version is 4.2.0. The dependency for Apache Flink 1.18 will be the following:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

In the following sections, we discuss more of the powerful new features now available in Apache Flink 1.18 and supported in Amazon Managed Service for Apache Flink.

SQL

In Apache Flink SQL, users can provide hints to join queries that can be used to suggest the optimizer to have an effect in the query plan. In particular, in streaming applications, lookup joins are used to enrich a table, representing streaming data, with data that is queried from an external system, typically a database. Since version 1.16, several improvements have been introduced for lookup joins, allowing you to adjust the behavior of the join and improve performance:

  • Lookup cache is a powerful feature, allowing you to cache in-memory the most frequently used records, reducing the pressure on the database. Previously, lookup cache was specific to some connectors. Since Apache Flink 1.16, this option has become available to all connectors internally supporting lookup (FLIP-221). As of this writing, JDBC, Hive, and HBase connectors support lookup cache. Lookup cache has three available modes: FULL, for a small dataset that can be held entirely in memory, PARTIAL, for a large dataset, only caching the most recent records, or NONE, to completely disable cache. For PARTIAL cache, you can also configure the number of rows to buffer and the time-to-live.
  • Async lookup is another feature that can greatly improve performance. Async lookup provides in Apache Flink SQL a functionality similar to Async I/O available in the DataStream API. It allows Apache Flink to emit new requests to the database without blocking the processing thread until responses to previous lookups have been received. Similarly to Async I/O, you can configure async lookup to enforce ordering or allow unordered results, or adjust the buffer capacity and the timeout.
  • You can also configure a lookup retry strategy in combination with PARTIAL or NONE lookup cache, to configure the behavior in case of a failed lookup in the external database.

All these behaviors can be controlled using a LOOKUP hint, like in the following example, where we show a lookup join using async lookup:

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

In this section, we discuss new improvements and support in PyFlink.

Python 3.10 support

Apache Flink newest versions introduced several improvements for PyFlink users. First and foremost, Python 3.10 is now supported, and Python 3.6 support has been completely removed (FLINK-29421). Managed Service for Apache Flink currently uses Python 3.10 runtime to run PyFlink applications.

Getting closer to feature parity

From the perspective of the programming API, PyFlink is getting closer to Java on every version. The DataStream API now supports features like side outputs and broadcast state, and gaps on windowing API have been closed. PyFlink also now supports new connectors like Amazon Kinesis Data Streams directly from the DataStream API.

Thread mode improvements

PyFlink is very efficient. The overhead of running Flink API operators in PyFlink is minimal compared to Java or Scala, because the runtime actually runs the operator implementation in the JVM directly, regardless of the language of your application. But when you have a user-defined function, things are slightly different. A line of Python code as simple as lambda x: x + 1, or as complex as a Pandas function, must run in a Python runtime.

By default, Apache Flink runs a Python runtime on each Task Manager, external to the JVM. Each record is serialized, handed to the Python runtime via inter-process communication, deserialized, and processed in the Python runtime. The result is then serialized and handed back to the JVM, where it’s deserialized. This is the PyFlink PROCESS mode. It’s very stable but it introduces an overhead, and in some cases, it may become a performance bottleneck.

Since version 1.15, Apache Flink also supports THREAD mode for PyFlink. In this mode, Python user-defined functions are run within the JVM itself, removing the serialization/deserialization and inter-process communication overhead. THREAD mode has some limitations; for example, THREAD mode cannot be used for Pandas or UDAFs (user-defined aggregate functions, consisting of many input records and one output record), but can substantially improve performance of a PyFlink application.

With version 1.16, the support of THREAD mode has been substantially extended, also covering the Python DataStream API.

THREAD mode is supported by Managed Service for Apache Flink, and can be enabled directly from your PyFlink application.

Apple Silicon support

If you use Apple Silicon-based machines to develop PyFlink applications, developing for PyFlink 1.15, you have probably encountered some of the known Python dependency issues on Apple Silicon. These issues have been finally resolved (FLINK-25188). These limitations did not affect PyFlink applications running on Managed Service for Apache Flink. Before version 1.16, if you wanted to develop a PyFlink application on a machine using M1, M2, or M3 chipset, you had to use some workarounds, because it was impossible to install PyFlink 1.15 or earlier directly on the machine.

Unaligned checkpoint improvements

Apache Flink 1.15 already supported Incremental Checkpoints and Buffer Debloating. These features can be used, particularly in combination, to improve checkpoint performance, making checkpointing duration more predictable, especially in the presence of backpressure. For more information about these features, see Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints.

With versions 1.16 and 1.17, several changes have been introduced to improve stability and performance.

Handling data skew

Apache Flink uses watermarks to support event-time semantics. Watermarks are special records, normally injected in the flow from the source operator, that mark the progress of event time for operators like event time windowing aggregations. A common technique is delaying watermarks from the latest observed event time, to allow events to be out of order, at least to some degree.

However, the use of watermarks comes with a challenge. When the application has multiple sources, for example it receives events from multiple partitions of a Kafka topic, watermarks are generated independently for each partition. Internally, each operator always waits for the same watermark on all input partitions, practically aligning it on the slowest partition. The drawback is that if one of the partitions is not receiving data, watermarks don’t progress, increasing the end-to-end latency. For this reason, an optional idleness timeout has been introduced in many streaming sources. After the configured timeout, watermark generation ignores any partition not receiving any record, and watermarks can progress.

You can also face a similar but opposite challenge if one source is receiving events much faster than the others. Watermarks are aligned to the slowest partition, meaning that any windowing aggregation will wait for the watermark. Records from the fast source have to wait, being buffered. This may result in buffering an excessive volume of data, and an uncontrollable growth of operator state.

To address the issue of faster sources, starting with Apache Flink 1.17, you can enable watermark alignment of source splits (FLINK-28853). This mechanism, disabled by default, makes sure that no partitions progress their watermarks too fast, compared to other partitions. You can bind together multiple sources, like multiple input topics, assigning the same alignment group ID, and configuring the duration of the maximal drift from the current watermark. If one specific partition is receiving events too fast, the source operator pauses consuming that partition until the drift is reduced below the configured threshold.

You can enable it for each source separately. All you need is to specify an alignment group ID, which will bind together all sources that have the same ID, and the duration of the maximal drift from the current minimal watermark. This will pause consuming from the source subtask that are advancing too fast, until the drift is lower than the threshold specified.

The following code snippet shows how you can set up watermark alignment of source splits on a Kafka source emitting bounded-out-of-orderness watermarks:

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

This feature is only available with FLIP-217 compatible sources, supporting watermark alignment of source splits. As of writing, among major streaming source connectors, only Kafka source supports this feature.

Direct support for Protobuf format

The SQL and Table APIs now directly support Protobuf format. To use this format, you need to generate the Protobuf Java classes from the .proto schema definition files and include them as dependencies in your application.

The Protobuf format only works with the SQL and Table APIs and only to read or write Protobuf-serialized data from a source or to a sink. Currently, Flink doesn’t directly support Protobuf to serialize state directly and it doesn’t support schema evolution, as it does for Avro, for example. You still need to register a custom serializer with some overhead for your application.

Keeping Apache Flink open source

Apache Flink internally relies on Akka for sending data between subtasks. In 2022, Lightbend, the company behind Akka, announced a license change for future Akka versions, from Apache 2.0 to a more restrictive license, and that Akka 2.6, the version used by Apache Flink, would not receive any further security update or fix.

Although Akka has been historically very stable and doesn’t require frequent updates, this license change represented a risk for the Apache Flink project. The decision of the Apache Flink community was to replace Akka with a fork of the version 2.6, called Apache Pekko (FLINK-32468). This fork will retain the Apache 2.0 license and receive any required updates by the community. In the meantime, the Apache Flink community will consider whether to remove the dependency on Akka or Pekko completely.

State compression

Apache Flink offers optional compression (default: off) for all checkpoints and savepoints. Apache Flink identified a bug in Flink 1.18.1 where the operator state couldn’t be properly restored when snapshot compression is enabled. This could result in either data loss or inability to restore from checkpoint. To resolve this, Managed Service for Apache Flink has backported the fix that will be included in future versions of Apache Flink.

In-place version upgrades with Managed Service for Apache Flink

If you are currently running an application on Managed Service for Apache Flink using Apache Flink 1.15 or older, you can now upgrade it in-place to 1.18 without losing the state, using the AWS Command Line Interface (AWS CLI), AWS CloudFormation or AWS Cloud Development Kit (AWS CDK), or any tool that uses the AWS API.

The UpdateApplication API action now supports updating the Apache Flink runtime version of an existing Managed Service for Apache Flink application. You can use UpdateApplication directly on a running application.

Before proceeding with the in-place update, you need to verify and update the dependencies included in your application, making sure they are compatible with the new Apache Flink version. In particular, you need to update any Apache Flink library, connectors, and possibly Scala version.

Also, we recommend testing the updated application before proceeding with the update. We recommend testing locally and in a non-production environment, using the target Apache Flink runtime version, to ensure no regressions were introduced.

And finally, if your application is stateful, we recommend taking a snapshot of the running application state. This will enable you to roll back to the previous application version.

When you’re ready, you can now use the UpdateApplication API action or update-application AWS CLI command to update the runtime version of the application and point it to the new application artifact, JAR, or zip file, with the updated dependencies.

For more detailed information about the process and the API, refer to In-place version upgrade for Apache Flink. The documentation includes a step by step instructions and a video to guide you through the upgrade process.

Conclusions

In this post, we examined some of the new features of Apache Flink, supported in Amazon Managed Service for Apache Flink. This list is not comprehensive. Apache Flink also introduced some very promising features, like operator-level TTL for the SQL and Table API [FLIP-292] and Time Travel [FLIP-308], but these are not yet supported by the API, and not really accessible to users yet. For this reason, we decided not to cover them in this post.

With the support of Apache Flink 1.18, Managed Service for Apache Flink now supports the latest released Apache Flink version. We have seen some of the interesting new features and new connectors available with Apache Flink 1.18 and how Managed Service for Apache Flink helps you upgrade an existing application in place.

You can find more details about recent releases from the Apache Flink blog and release notes:

If you are new to Apache Flink, we recommend our guide to choosing the right API and language and following the getting started guide to start using Managed Service for Apache Flink.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers, helping them design real-time analytics architectures using AWS services, supporting Amazon MSK and Amazon Managed Service for Apache Flink.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 2

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints-part-2/

This post is a continuation of a two-part series. In the first part, we delved into Apache Flink‘s internal mechanisms for checkpointing, in-flight data buffering, and handling backpressure. We covered these concepts in order to understand how buffer debloating and unaligned checkpoints allow us to enhance performance for specific conditions in Apache Flink applications.

In Part 1, we introduced and examined how to use buffer debloating to improve in-flight data processing. In this post, we focus on unaligned checkpoints. This feature has been available since Apache Flink 1.11 and has received many improvements since then. Unaligned checkpoints help, under specific conditions, to reduce checkpointing time for applications suffering temporary backpressure, and can be now enabled in Amazon Managed Service for Apache Flink applications running Apache Flink 1.15.2 through a support ticket.

Even though this feature might improve performance for your checkpoints, if your application is constantly failing because of checkpoints timing out, or is suffering from having constant backpressure, you may require a deeper analysis and redesign of your application.

Aligned checkpoints

As discussed in Part 1, Apache Flink checkpointing allows applications to record state in case of failure. We’ve already discussed how checkpoints, when triggered by the job manager, signal all source operators to snapshot their state, which is then broadcasted as a special record called a checkpoint barrier. This process achieves exactly-once consistency for state in a distributed streaming application through the alignment of these barriers.

Let’s walk through the process of aligned checkpoints in a standard Apache Flink application. Remember that Apache Flink distributes the workload horizontally: each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks based on its parallelism.

Barrier alignment

The alignment of checkpoint barriers is crucial for achieving exactly-once consistency in Apache Flink applications during checkpoint runs. To recap, when a job manager triggers a checkpoint, all sub-tasks of source operators receive a signal to initiate the checkpoint process. Each sub-task independently snapshots its state to the state backend and broadcasts a special record known as a checkpoint barrier to all outgoing streams.

When an application operates with a parallelism higher than 1, multiple instances of each task—referred to as sub-tasks—enable parallel message consumption and processing. A sub-task can receive distinct partitions of the same stream from different upstream sub-tasks, such as after a stream repartitioning with keyBy or rebalance operations. To maintain exactly-once consistency, all sub-tasks must wait for the arrival of all checkpoint barriers before taking a snapshot of the state. The following diagram illustrates the checkpoint barriers flow.

Checkpoint Barriers flow in the Buffer Queues

This phase is called checkpoint alignment. During alignment, the sub-task stops processing records from the partitions from which it has already received barriers, as shown in the following figure.

The first Barrier reaches the sub-task: Checkpointing Alignment begins

However, it continues to process partitions that are behind the barrier.

Processing continues only for partitions behind the barrier

When barriers from all upstream partitions have arrived, the sub-task takes a snapshot of its state.

Barrier alignment complete: snapshot state

Then it broadcasts the barrier downstream.

Emit Barriers downstream, and continue processing

The time a sub-task spends waiting for all barriers to arrive is measured by the checkpoint Alignment Duration metric, which can be observed in the Apache Flink UI.

If the application experiences backpressure, an increase in this metric could lead to longer checkpoint durations and even checkpoint failures due to timeouts. This is where unaligned checkpoints become a viable option to potentially enhance checkpointing performance.

Unaligned checkpoints

Unaligned checkpoints address situations where backpressure is not just a temporary spike, but results in timeouts for aligned checkpoints, due to barrier queuing within the stream. As discussed in Part 1, checkpoint barriers can’t overtake regular records. Therefore, significant backpressure can slow down the movement of barriers across the application, potentially causing checkpoint timeouts.

The objective of unaligned checkpoints is to enable barrier overtaking, allowing barriers to move swiftly from source to sink even when the data flow is slower than anticipated.

Building on what we saw in Part 1 concerning checkpoints and what aligned checkpoints are, let’s explore how unaligned checkpoints modify the checkpointing mechanism.

Upon emission, each source’s checkpoint barrier is injected into the stream flowing across sub-tasks. It travels from the source output network buffer queue into the input network buffer queue of the subsequent operator.

Upon the arrival of the first barrier in the input network buffer queue, the operator initially waits for barrier alignment. If the specified alignment timeout expires because not all barriers have reached the end of the input network buffer queue, the operator switches to unaligned checkpoint mode.

The alignment timeout can be set programmatically by env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30)), but modifying the default is not recommended in Apache Flink 1.15.

Checkpoint barriers flow in the buffer queues

The operator waits until all checkpoint barriers are present in the input network buffer queue before triggering the checkpoint. Unlike aligned checkpoints, the operator doesn’t need to wait for all barriers to reach the queue’s end, allowing the operator to have in-flight data from the buffer that hasn’t been processed before checkpoint initiation.

All barriers are in the input queues

After all barriers have arrived in the input network buffer queue, the operator advances the barrier to the end of the output network buffer queue. This enhances checkpointing speed because the barrier can smoothly traverse the application from source to sink, independent of the application’s end-to-end latency.

Barriers can overtake in-flight messages

After forwarding the barrier to the output network buffer queue, the operator initiates the snapshot of in-flight data between the barriers in the input and output network buffer queues, along with the snapshot of the state.

Although processing is momentarily paused during this process, the actual writing to the remote persistent state storage occurs asynchronously, preventing potential bottlenecks.

Snapshot state and in-flight messages

The local snapshot, encompassing in-flight messages and state, is saved asynchronously in the remote persistent state store, while the barrier continues its journey through the application.

Processing continues

When to use unaligned checkpoints

Remember, barrier alignment only occurs between partitions coming from different sub-tasks of the same operator. Therefore, if an operator is experiencing temporary backpressure, enabling unaligned checkpoints may be beneficial. This way, the application doesn’t have to wait for all barriers to reach the operator before performing the snapshot of state or moving the barrier forward.

Temporary backpressure could arise from the following:

  • A surge in data ingestion
  • Backfilling or catching up with historical data
  • Increased message processing time due to delayed external systems

Another scenario where unaligned checkpoints prove advantageous is when working with exactly-once sinks. Utilizing the two-phase commit sink function for exactly-once sinks, unaligned checkpoints can expedite checkpoint runs, thereby reducing end-to-end latency.

When not to use unaligned checkpoints

Unaligned checkpoints won’t reduce the time required for savepoints (called snapshots in the Amazon Managed Service for Apache Flink implementation) because savepoints exclusively utilize aligned checkpoints. Furthermore, because Apache Flink doesn’t permit concurrent unaligned checkpoints, savepoints won’t occur simultaneously with unaligned checkpoints, potentially elongating savepoint durations.

Unaligned checkpoints won’t fix any underlying issue in your application design. If your application is suffering from persistent backpressure or constant checkpointing timeouts, this might indicate data skewness or underprovisioning, which may require improving and tuning the application.

Using unaligned checkpoints with buffer debloating

One alternative for reducing the risks associated with an increased state size is to combine unaligned checkpoints with buffer debloating. This approach results in having less in-flight data to snapshot and store in the state, along with less data to be used for recovery in case of failure. This synergy facilitates enhanced performance and efficient checkpoint runs, leading to smaller checkpointing sizes and faster recovery times. When testing the use of unaligned checkpoints, we recommend doing so with buffer debloating to prevent the state size from increasing.

Limitations

Unaligned checkpoints are subject to the following limitations:

  • They provide no benefit for operators with a parallelism of 1.
  • They only improve performance for operators where barrier alignment would have occurred. This alignment happens only if records are coming from different sub-tasks of the same operator, for example, through repartitioning or keyBy operations.
  • Operators receiving input from multiple sources or participating in joins might not experience improvements, because the operator would be receiving data from different operators in those cases.
  • Although checkpoint barriers can surpass records in the network’s buffer queue, this won’t occur if the sub-task is currently processing a message. If processing a message takes too much time (for example, a flat-map operation emitting numerous records for each input record), barrier handling will be delayed.
  • As we have seen, savepoints always use aligned checkpoints. If the savepoints of your applications are slow due to barrier alignment, unaligned checkpoints will not help.
  • Additional limitations affect watermarks, message ordering, and broadcast state in recovery. For more details, refer to Limitations.

Considerations

Considerations for implementing unaligned checkpoints:

  • Unaligned checkpoints introduce additional I/O to checkpoint storage
  • Checkpoints encompass not only operator state but also in-flight data within network buffer queues, leading to increased state size

Recommendations

We offer the following recommendations:

  • Consider enabling unaligned checkpoints only if both of the following conditions are true:
  • Checkpoints are timing out.
  • The average checkpoint Async Duration of any operator is more than 50% of the total checkpoint duration for the operator (sum of Sync Duration + Async Duration).
  • Consider enabling buffer debloating first, and evaluate whether it solves the problem of checkpoints timing out.
  • If buffer debloating doesn’t help, consider enabling unaligned checkpoints along with buffer debloating. Buffer debloating mitigates the drawbacks of unaligned checkpoints, reducing the amount of in-flight data.
  • If unaligned checkpoints and buffer debloating together don’t improve checkpoint alignment duration, consider testing unaligned checkpoints alone.

Decision flow

Finally, but most importantly, always test unaligned checkpoints in a non-production environment first, running some comparative performance testing with a realistic workload, and verify that unaligned checkpoints actually reduce checkpoint duration.

Conclusion

This two-part series explored advanced strategies for optimizing checkpointing within your Amazon Managed Service for Apache Flink applications. By harnessing the potential of buffer debloating and unaligned checkpoints, you can unlock significant performance improvements and streamline checkpoint processes. However, it’s important to understand when these techniques will provide improvements and when they will not. If you believe your application may benefit from checkpoint performance improvement, you can enable these features in your Amazon Managed Service For Apache Flink version 1.15 applications. We recommend first enabling buffer debloating and testing the application. If you are still not seeing the expected outcome, enable buffer debloating with unaligned checkpoints. This way, you can immediately reduce the state size and the additional I/O to state backends. Lastly, you may try using unaligned checkpoints by itself, bearing in mind the considerations we’ve mentioned.

With a deeper understanding of these techniques and their applicability, you are better equipped to maximize the efficiency of checkpoints and mitigate the effect of backpressure in your Apache Flink application.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 1

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/part-1-optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints/

This post is the first of a two-part series regarding checkpointing mechanisms and in-flight data buffering. In this first part, we explain some of the fundamental Apache Flink internals and cover the buffer debloating feature. In the second part, we focus on unaligned checkpoints.

Apache Flink is an open-source distributed engine for stateful processing over unbounded datasets (streams) and bounded datasets (batches). Amazon Managed Service for Apache Flink, formerly known as Amazon Kinesis Data Analytics, is the AWS service offering fully managed Apache Flink.

Apache Flink is designed for stateful processing at scale, for high throughput and low latency. It scales horizontally, distributing processing and state across multiple nodes, and is designed to withstand failures without compromising the exactly-once consistency it provides.

Internally, Apache Flink uses clever mechanisms to maintain exactly-once state consistency, while also optimizing for throughput and reduced latency. The default behavior works well for most use cases. Recent versions introduced two functionalities that can be optionally enabled to improve application performance under particular conditions: buffer debloating and unaligned checkpoints.

Buffer debloating and unaligned checkpoints can be enabled on Amazon Managed Service for Apache Flink version 1.15.

To understand how these functionalities can help and when to use them, we need to dive deep into some of the fundamental internal mechanisms of Apache Flink: checkpointing, in-flight data buffering, and backpressure.

Maintaining state consistency through failures with checkpointing

Apache Flink checkpointing periodically saves the internal application state for recovering in case of failure. Each of the distributed components of an application asynchronously snapshots its state to an external persistent datastore. The challenge is taking snapshots guaranteeing exactly-once consistency. A naïve “stop-the-world, take a snapshot” implementation would never meet the high throughput and low latency goals Apache Flink has been designed for.

Let’s walk through the process of checkpointing in a simple streaming application.

As shown in the following figure, Apache Flink distributes the work horizontally. Each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks, based on its parallelism. The application is coordinated by a job manager. Checkpoints are periodically initiated by the job manager, sending a signal to all source operators’ sub-tasks.

Checkpoint initiated by the Job Manager

On receiving the signal, each source sub-task independently snapshots its state (for example, the offsets of the Kafka topic it is consuming) to a persistent storage, and then broadcasts a special record called checkpoint barrier (“CB” in the following diagrams) to all outgoing streams. Checkpoint barriers work similarly to watermarks in Apache Flink, flowing in-bands, along with normal records. A barrier does not overtake normal records and is not overtaken.

Source operators emit checkpoint bariers

When a downstream operator’s sub-task receives all checkpoint barriers from all input channels, it starts snapshotting its state.

A sub-task does not pause processing while saving its state to the remote, persistent state backend. This is a two-phase operation. First, the sub-task takes a snapshot of the state, on the local file system or in memory, depending on application configuration. This operation is blocking but very fast. When the snapshot is complete, it restarts processing records, while the state is asynchronously saved to the external, persistent state store. When the state is successfully saved to the state store, the sub-task acknowledges to the job manager that its checkpointing is complete.

The time a sub-task spends on the synchronous and asynchronous parts of the checkpoint is measured by Sync Duration and Async Duration metrics, shown by the Apache Flink UI. It is then asynchronously sent to the backend. After the fast snapshot, the sub-task restarts processing messages. The backend notifies the sub-task when the state has been successfully saved. The sub-task, in turn, sends an acknowledgment to the job manager that checkpointing is complete.

Sub-tasks acknowledge checkpoint completion

Checkpoint barriers propagate through all operators, down to the sinks. When all sink sub-tasks have acknowledged the checkpoint to the job manager, the checkpoint is declared complete and can be used to recover the application, for example in case of failure.

Sink operators acknowledge checkpoint is complete

Checkpoint barrier alignment

A sub-task may receive different partitions of the same stream from different upstream sub-tasks, for example when a stream is repartitioned with a keyBy or a rebalance. Each upstream sub-task will emit a checkpoint barrier independently. To maintain exactly-once consistency, the sub-task must wait for the barriers to arrive on all input partitions before taking a snapshot of its state.

This phase is called checkpoint alignment. During the alignment, the sub-task stops processing records from the partitions it already received the barrier from and continues processing the partitions that are behind the barrier.

After the barriers from all upstream partitions have arrived, the sub-task takes the snapshot of its state and then broadcasts the barrier downstream.

The time spent by a sub-task while aligning barriers is measured by the Checkpoint Alignment Duration metric, shown by the Apache Flink UI.

Checkpoint barrier alignment

In-flight data buffering

To optimize for throughput, Apache Flink tries to keep each sub-task always busy. This is achieved by transmitting records over the network in blocks and by buffering in-flight data. Note that this is data transmission optimization; Flink operators always process records one at the time.

Data is handed over between sub-tasks in units called network buffers. A network buffer has a fixed size, in bytes.

Sub-tasks also buffer in-flight input and output data. These buffers are called network buffer queues. Each queue is composed of multiple network buffers. Each sub-task has an input network buffer queue for each upstream sub-task and an output network buffer queue for each downstream sub-task.

Each record emitted by the sub-task is serialized, put into network buffers, and published to the output network buffer queue. To use all the available space, multiple messages can be packed into a single network buffer or split across subsequent network buffers.

A separate thread sends full network buffers over the network, where they are stored in the destination sub-task’s input network buffer queue.

When the destination sub-task thread is free, it deserializes the network buffers, rebuilds the records, and processes them one at a time.

Network Buffer Queue

Backpressure

If a sub-task can’t keep up with processing records at the same pace they are received, the input queue fills up. When the input queue is full, the upstream sub-task stops sending data.

Data accumulates in the sender’s output queue. When this is also full, the sender sub-task stops processing records, accumulating received data in its own input queue, and the effects propagates upstream.

This is the backpressure that Apache Flink uses to control the internal flow, preventing slow operators from being overwhelmed by slowing down the upstream flow. Backpressure is a safety mechanism to maximize the application throughput. It can be temporary, in case of an unexpected peak of ingested data, for example. If not temporary, it is usually the symptom—not the cause—that the application is not designed correctly or it has insufficient resources to process the workload.

Full Network Buffer Queue generates backpressure

In-flight buffering and checkpoint barriers

As checkpoint barriers flow with normal records, they also flow in the network buffers, through the input and output queues. In normal conditions, barriers don’t overtake records, and they are never overtaken. If records are queueing up due to backpressure, checkpoint barriers are also stuck in the queue, taking longer time to propagate from the sources to the sinks, delaying the completion of the checkpoint.

In the second part of this series, we will see how unaligned checkpoints can let barriers overtake records under specific conditions. For now, let’s see how we can optimize the size of input and output queues with buffer debloating.

Buffer debloating to optimize in-flight data

The default network buffer queue size is a good compromise for most applications. You can modify this size, but it applies to all sub-tasks, and it may be difficult to optimize this one-size-fits-all across different operators.

Longer queues support bigger throughout, but they may slow down checkpoint barriers that have to go through longer queues, causing longer End to End Checkpoint Duration. Ideally, the amount of in-flight data should be adjusted based on the actual throughput.

In version 1.14, Apache Flink introduced buffer debloating, which can be enabled to adjust in-flight data of each sub-task, based on the current throughput the sub-task is processing, and periodically reassess and readjust it.

How buffer debloating helps your application

Consider a streaming application, ingesting records from a streaming source and publishing the results to a streaming destination after some transformations. Under normal conditions, the application is sized to process the incoming throughput smoothly. Our destination has limited capacity, for example a Kafka topic throttled via quotas, sufficient to handle the normal throughput, with some margin.

In-flight data buffering under normal throughput

Imagine that the ingestion throughput has occasional peaks. These peaks exceed the limits of the streaming destination (throughput quota of the Kafka topic), which starts throttling.

Full in-flight data buffer to the sink backpressure the preceding operator

Because the sink can’t process the full throughput, in-flight data accumulates upstream of the sink, causing backpressure on the upstream operator. The effect eventually propagates up to the source, and the source starts lagging behind the most recent record in the source stream.

Backpressure propagates upstream, up to the source operator

As long this is a temporary condition, backpressure and lagging are not a problem per se, as long as the application is able to catch up when the peak has finished.

Unfortunately, accumulating in-flight data also slows down the propagation of the checkpoint barriers. Checkpoint End to End Duration goes up, and checkpoints may eventually time out.

Full in-flight data buffers slow down checkpoint barrier propagation, under backpressure

The situation is even worse if the sink uses two-phase commit for exactly-once guarantees. For example, KafkaSink uses Kafka transactions committed on checkpoints. If checkpoints become too slow, transactions are committed later, significantly increasing the latency of any downstream consumer using a read-committed isolation level.

Slow checkpoints under backpressure may also cause a vicious cycle. A slowed-down application eventually crashes, and recovers from the last checkpoint that is quite old. This causes a long reprocessing that, in turn, induces more backpressure and even slower checkpoints.

In this case, buffer debloating can help by adjusting the amount of in-flight data based on the throughput each sub-task is actually processing. When a sub-task is throttled by backpressure, the amount of in-flight data is reduced, also reducing the time checkpoint barriers take to go through all operators. Checkpoint End to End Duration goes down, and checkpoints do not time out.

Buffer debloating internals

Buffer debloating estimates the throughput a sub-task is capable of processing, assuming no idling, and limits the upstream in-flight data buffers to contain just enough data to be processed in 1 second (by default).

For efficiency, network buffers in the queues are fixed. Buffer debloating caps the usable size of each network buffer, making it smaller when the sub-task is processing slowly.

Buffer debloating speed up barrier propagation, reducing the volume of in-flight data

The benefits of less in-flight data depends on whether Apache Flink is using standard checkpoint alignment, the default behavior described so far, or unaligned checkpoints. We will examine unaligned checkpoints in the second part of this series, but let’s see the effect of buffer debloating, briefly.

  • With aligned checkpoints (default behavior) – Less in-flight data makes checkpoint barrier propagation faster, ultimately reducing the end-to-end checkpoint duration but also making it more predictable
  • With unaligned checkpoints (optional) – Less in-flight data reduces the amount of in-flight records stored with the checkpoint, ultimately reducing the checkpoint size

What buffer debloating does not do

Note that the problem we are trying to solve is slow checkpointing (or excessive checkpointing size, with unaligned checkpoints). Buffer debloating helps making checkpointing faster.

Buffer debloating does not remove backpressure. Backpressure is the internal protective mechanism that Apache Flink uses when some part of the application is not able to cope with the incoming throughput. To reduce backpressure, you have to work on other aspects of the application. When backpressure is only temporary, for example under peak conditions, the only way of removing it would be sizing the end-to-end system for the peak, rather than normal workload. But this could be impossible or too expensive.

Buffer debloating helps reduce and keep checkpoint duration stable under exceptional and temporary conditions. If an application experiences backpressure under its normal workload, or checkpoints are too slow under normal conditions, you should investigate the implementation of your application to understand the root cause.

When the automatic throughput prediction fails

Buffer debloating doesn’t have any particular drawback, but in corner cases, the mechanism may incorrectly estimate the throughput, and the resulting amount of in-flight data may not be optimal.

Estimating the throughput is complex when an operator receives data from multiple upstream operators, connected streams or unions, with very different throughput. It may also take time to adjust to a sudden spike, causing a temporary suboptimal buffering.

  • Too small in-flight data may reduce the throughput the sub-task can process (it will be idling), causing more backpressure upstream
  • Too large buffers may slow down checkpointing and increase the checkpoint size (with unaligned checkpoints)

Conclusion

The checkpointing mechanism makes Apache Flink fault tolerant, providing exactly-once state consistency. In-flight data buffering and backpressure control the data flow within the distributed streaming application maximize the throughput. Apache Flink default behaviors and configurations are good for most workloads.

The effectiveness of buffer debloating depends on the characteristics of the workload and the application. The general recommendation is to test the functionality in a non-production environment with a realistic workload to verify it actually helps with your use case.

You can request to enable buffer debloating on your Amazon Managed Service for Apache Flink application.

Under particular conditions, the combined effect of backpressure and in-flight data buffering may slow down checkpointing, increase checkpointing size (with unaligned checkpoints), and even cause checkpoints to fail. In these cases, enabling unaligned checkpointing may help reduce checkpoint duration or size.

In the second part of this series, we will understand better unaligned checkpoints and how they can help your application checkpointing efficiently in presence of backpressure, especially in combination with buffer debloating.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.