All posts by Lorenzo Nicora

Achieve full control over your data encryption using customer managed keys in Amazon Managed Service for Apache Flink

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/achieve-full-control-over-your-data-encryption-using-customer-managed-keys-in-amazon-managed-service-for-apache-flink/

Encryption of both data at rest and in transit is a non-negotiable feature for most organizations. Furthermore, organizations operating in highly regulated and security-sensitive environments—such as those in the financial sector—often require full control over the cryptographic keys used for their workloads.

Amazon Managed Service for Apache Flink makes it straightforward to process real-time data streams with robust security features, including encryption by default to help protect your data in transit and at rest. The service removes the complexity of managing the key lifecycle and controlling access to the cryptographic material.

If you need to retain full control over your key lifecycle and access, Managed Service for Apache Flink now supports the use of customer managed keys (CMKs) stored in AWS Key Management Service (AWS KMS) for encrypting application data.

This feature helps you manage your own encryption keys and key policies, so you can meet strict compliance requirements and maintain complete control over sensitive data. With CMK integration, you can take advantage of the scalability and ease of use that Managed Service for Apache Flink offers, while meeting your organization’s security and compliance policies.

In this post, we explore how the CMK functionality works with Managed Service for Apache Flink applications, the use cases it unlocks, and key considerations for implementation.

Data encryption in Managed Service for Apache Flink

In Managed Service for Apache Flink, there are multiple aspects where data should be encrypted:

  • Data at rest directly managed by the service – Durable application storage (checkpoints and snapshots) and running application state storage (disk volumes used by RocksDB state backend) are automatically encrypted
  • Data in transit internal to the Flink cluster – Automatically encrypted using TLS/HTTPS
  • Data in transit to and at rest in external systems that your Flink application accesses – For example, an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic through the Kafka connector or calling an endpoint through a custom AsyncIO); encryption depends on the external service, user settings, and code

For data at rest managed by the service, checkpoints, snapshots, and running application state storage are encrypted by default using AWS owned keys. If your security requirements require you to directly control the encryption keys, you can use the CMK held in AWS KMS.

Key components and roles

To understand how CMKs work in Managed Service for Apache Flink, we first need to introduce the components and roles involved in managing and running an application using CMK encryption:

  • Customer managed key (CMK):
    • Resides in AWS KMS within the same AWS account as your application
    • Has an attached key policy that defines access permissions and usage rights to other components and roles
    • Encrypts both durable application storage (checkpoints and snapshots) and running application state storage
  • Managed Service for Apache Flink application:
    • The application whose storage you want to encrypt using the CMK
    • Has an attached AWS Identity and Access Management (IAM) execution role that grants permissions to access external services
    • The execution role doesn’t have to provide any specific permissions to use the CMK for encryption operations
  • Key administrator:
    • Manages the CMK lifecycle (creation, rotation, policy updates, and so on)
    • Can be an IAM user or IAM role, and used by a human operator or by automation
    • Requires administrative access to the CMK
    • Permissions are defined by the attached IAM policies and the key policy
  • Application operator:
    • Manages the application lifecycle (start/stop, configuration updates, snapshot management, and so on)
    • Can be an IAM User or IAM role, and used by a human operator or by automation
    • Requires permissions to manage the Flink application and use the CMK for encryption operations
    • Permissions are defined by the attached IAM policies and the key policy

The following diagram illustrates the solution architecture.

Actors

Enabling CMK following the principle of least privilege

When deploying applications in production environments or handling sensitive data, you should follow the principle of least privilege. CMK support in Managed Service for Apache Flink has been designed with this principle in mind, so each component receives only the minimum permissions necessary to function.

For detailed information about the permissions required by the application operator and key policy configurations, refer to Key management in Amazon Managed Service for Apache Flink. Although these policies might appear complex at first glance, this complexity is intentional and necessary. For more details about the requirements for implementing the most restrictive key management possible while maintaining functionality, refer to Least-privilege permissions.

For this post, we highlight some important points about CMK permissions:

  • Application execution role – Requires no additional permissions to use a CMK. You don’t need to change the permissions of an existing application; the service handles CMK operations transparently during runtime.
  • Application operator permissions – The operator is the user or role who controls the application lifecycle. For the permissions required to operate an application that uses CMK encryption, refer to Key management in Amazon Managed Service for Apache Flink. In addition to these permissions, an operator normally has permissions on actions with the kinesisanalytics prefix. It is a best practice to restrict these permissions to a specific application defining the Resource. The operator must also have the iam:PassRole permission to pass the service execution role to the application.

To simplify managing the permissions of the operator, we recommend creating two separate IAM policies, to be attached to the operator’s role or user:

  • A base operator policy defining the basic permissions to operate the application lifecycle without a CMK
  • An additional CMK operator policy that adds permissions to operate the application with a CMK

The following IAM policy example illustrates the permissions that should be included in the base operator policy:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Allow Managed Flink operations",
      "Effect": "Allow",
      "Action": "kinesisanalytics:*",
      "Resource": "arn:aws:kinesisanalytics:<region>:<account-id>:application/MyApplication"
    },
    {
      "Sid": "Allow passing service execution role",
      "Effect": "Allow",
      "Action": [
        "iam:PassRole"
      ],
      "Resource": "arn:aws:iam::<account-id>:role/MyApplicationRole"
    },
  ]
} 

Refer to Application lifecycle operator (API caller) permissions for the permissions to be included with the additional CMK operator policy.

Separating these two policies has an additional benefit of simplifying the process of setting up an application for the CMK, due to the dependencies we illustrate in the following section.

Dependencies between the key policy and CMK operator policy

If you carefully observe the operator’s permissions and the key policy explained in Create a KMS key policy, you will notice some interdependencies, illustrated by the following diagram.

Dependencies

In particular, we highlight the following:

  • CMK key policy dependencies – The CMK policy requires references to both the application Amazon Resource Name (ARN) and the key administrator or operator IAM roles or users. This policy must be defined at key creation time by the key administrator.
  • IAM policy dependencies – The operator’s IAM policy must reference both the application ARN and the CMK key itself. The operator role is responsible for various tasks, including configuring the application to use the CMK.

To properly follow the principle of least privilege, each component requires the others to exist before it can be correctly configured. This necessitates a carefully orchestrated deployment sequence.

In the following section, we demonstrate the precise order required to resolve these dependencies while maintaining security best practices.

Sequence of operations to create a new application with a CMK

When deploying a new application that uses CMK encryption, we recommend following this sequenced approach to resolve dependency conflicts while maintaining security best practices:

  1. Create the operator IAM role or user with a base policy that includes application lifecycle permissions. Do not include CMK permissions at this stage, because the key doesn’t exist yet.
  2. The operator creates the application using the default AWS owned key. Keep the application in a stopped state to prevent data creation—there should be no data at rest to encrypt during this phase.
  3. Create the key administrator IAM role or user, if not already available, with permissions to create and manage KMS keys. Refer to Using IAM policies with AWS KMS for detailed permission requirements.
  4. The key administrator creates the CMK in AWS KMS. At this point, you have the required components for the key policy: application ARN, operator IAM role or user ARN, and key administrator IAM role or user ARN.
  5. Create and attach to the operator an additional IAM policy that includes the CMK-specific permissions. See Application lifecycle operator (API caller) permissions for the complete operator policy definition.
  6. The operator can now modify the application configuration using the UpdateApplication action, to enable CMK encryption, as illustrated in the following section.
  7. The application is now ready to run with all data at rest encrypted using your CMK.

Enable the CMK with UpdateApplication

You can configure a Managed Service for Apache Flink application to use a CMK using the AWS Management Console, the AWS API, AWS Command Line Interface (AWS CLI), or infrastructure as code (IaC) tools like the AWS Cloud Development Kit (AWS CDK) or AWS CloudFormation templates.

When setting up CMK encryption in a production environment, you will probably use an automation tool rather than the console. These tools eventually use the AWS API under the hood, and the UpdateApplication action of the kinesisanalyticsv2 API in particular. In this post, we analyze the additions to the API that you can use to control the encryption configuration.

An additional top-level block ApplicationEncryptionConfigurationUpdate has been added to the UpdateApplication request payload. With this block, you can enable and disable the CMK.

You must add the following block to the UpdateApplication request:

{
  "ApplicationEncryptionConfigurationUpdate": {
    "KeyTypeUpdate": "CUSTOMER_MANAGED_KEY",
    "KeyIdUpdate": "arn:aws:kms:us-east-1:123456789012:key/01234567-99ab-cdef-0123-456789abcdef"
  }
}

The KeyIdUpdate value can be the key ARN, key ID, key alias name, or key alias ARN.

Disable the CMK

Similarly, the following requests disable the CMK, switching back to the default AWS owned key:

{
  "ApplicationEncryptionConfigurationUpdate": {
    "KeyTypeUpdate": "AWS_OWNED_KEY"
  }
}

Enable the CMK with CreateApplication

Theoretically, you can enable the CMK directly when you first create the application using the CreateApplication action.

A top-level block ApplicationEncryptionConfiguration has been added to the CreateApplication request payload, with a syntax similar to UpdateApplication.

However, due to the interdependencies described in the previous section, you will most often create an application with the default AWS owned key and later use UpdateApplication to enable the CMK.

If you omit ApplicationEncryptionConfiguration when you create the application, the default behavior is using the AWS owned key, for backward compatibility.

Sample CloudFormation templates to create IAM roles and the KMS key

The process you use to create the roles and key and configure the application to use the CMK will vary, depending on the automation you use and your approval and security processes. Any automation example we can provide will likely not fit your processes or tooling.

However, the following GitHub repository provides some example CloudFormation templates to generate some of the IAM policies and the KMS key with the correct key policy:

  • IAM policy for the key administrator – Allows managing the key
  • Base IAM policy for the operator – Allows managing the normal application lifecycle operations without the CMK
  • CMK IAM policy for the operator – Provides additional permissions required to manage the application lifecycle when the CMK is enabled
  • KMS key policy – Allows the application to encrypt and decrypt the application state and the operator to manage the application operations

CMK operations

We have described the process of creating a new Managed Service for Apache Flink application with CMK. Let’s now examine other common operations you can perform.

Changes to the encryption key become effective when the application is restarted. If you update the configuration of a running application, this causes the application to restart and the new key to be used immediately. Conversely, if you change the key of a READY (not running) application, the new key is not actually used until the application is restarted.

Enable a CMK on an existing application

If you have an application running with an AWS owned key, the process is similar to what we described for creating new applications. In this case, you already have a running application state and older snapshots that are encrypted using the AWS owned key.

Also, if you have a running application, you probably already have an operator role with an IAM policy that you can use to control the operator lifecycle.

The sequence of steps to enable a CMK on an existing and running application is as follows:

  1. If you don’t already have one, create a key administrator IAM role or user with permissions to create and manage keys in AWS KMS. See Using IAM policies with AWS KMS for more details about the permissions required to manage keys.
  2. The key administrator creates the CMK. The key policy references the application ARN, the operator’s ARN, and the key administrator’s role or user ARN.
  3. Create an additional IAM policy that allows the use of the CMK and attach this policy to the operator. Alternatively, modify the operator’s existing IAM policy by adding these permissions.
  4. Finally, the operator can update the application and enable the CMK.The following diagram illustrates the process that occurs when you execute an UpdateApplication action on the running application to enable a CMK.

    Enabling CMK on an existing application

    The workflow consists of the following steps:

  5. When you update the application to set up the CMK, the following happens:
    1. The application running state, at the moment it is encrypted with the AWS owned key, is saved in a snapshot while the application is stopped. This snapshot is encrypted with the default AWS owned key. The running application state storage is volatile and destroyed when the application is stopped.
    2. The application is redeployed, restoring the snapshot into the running application state.
    3. The running application state storage is now encrypted with the CMK.
  6. New snapshots created from this point on are encrypted using the CMK.
  7. You will probably want to delete all the old snapshots, including the one created automatically by the UpdateApplication that enabled the CMK, because they are all encrypted using the AWS owned key.

Rotate the encryption key

As with any cryptographic key, it’s a best practice to rotate the key periodically for enhanced security. Managed Service for Apache Flink does not support AWS KMS automatic key rotation, so you have two primary options for rotating your CMK.

Option 1: Create a new CMK and update the application

The first approach involves creating an entirely new KMS key and then updating your application configuration to use the new key. This method provides a clean separation between the old and new encryption keys, making it easier to track which data was encrypted with which key version.

Let’s assume you have a running application using CMK#1 (the current key) and want to rotate to CMK#2 (the new key) for enhanced security:

  • Prerequisites and preparation – Before initiating the key rotation process, you must update the operator’s IAM policy to include permissions for both CMK#1 and CMK#2. This dual-key access supports uninterrupted operation during the transition period. After the application configuration has been successfully updated and verified, you can safely remove all permissions to CMK#1.
  • Application update process – The UpdateApplication operation used to configure CMK#2 automatically triggers an application restart. This restart mechanism makes sure both the application’s running state and any newly created snapshots are encrypted using the new CMK#2, providing immediate security benefits from the updated encryption key.
  • Important security considerations – Existing snapshots, including the automatic snapshot created during the CMK update process, remain encrypted with the original CMK#1. For complete security hygiene and to minimize your cryptographic footprint, consider deleting these older snapshots after verifying that your application is functioning correctly with the new encryption key.

This approach provides a clean separation between old and new encrypted data while maintaining application availability throughout the key rotation process.

Option 2: Rotate the key material of the existing CMK

The second option is to rotate the cryptographic material within your existing KMS key. For a CMK used for Managed Service for Apache Flink, we recommend using on-demand key material rotation.

The benefit of this approach is simplicity: no change is required to the application configuration nor to the operator’s IAM permissions.

Important security considerations

The new encryption key is used by the Managed Service for Apache Flink application only after the next application restart. To make the new key material effective, immediately after the rotation, you need to stop and start using snapshots to preserve the application state or execute an UpdateApplication, which also forces a stop-and-restart. After the restart, you should consider deleting the old snapshots, including the one taken automatically in the last stop-and-restart.

Switch back to the AWS owned key

At any time, you can decide to switch back to using an AWS owned key. The application state is still encrypted, but using the AWS owned key instead of your CMK.

If you are using the UpdateApplication API or AWS CLI command to switch back to CMK, you must explicitly pass ApplicationEncryptionConfigurationUpdate, setting the key type to AWS_OWNED_KEY as shown in the following snippet:

{
  "ApplicationEncryptionConfigurationUpdate": {
    "KeyTypeUpdate": "AWS_OWNED_KEY"
  }
}

When you execute UpdateApplication to switch off the CMK, the operator must still have permissions on the CMK. After the application is successfully running using the AWS owned key, you can safely remove any CMK-related permissions from the operator’s IAM policy.

Test the CMK in development environments

In a production environment—or an environment containing sensitive data—you should follow the principle of least privilege and apply the restrictive permissions described so far.

However, if you want to experiment with CMKs in a development setting, such as using the console, strictly following the production process might become cumbersome. In these environments, the roles of key administrator and operator are often filled by the same person.

For testing purposes in development environments, you might want to use a permissive key policy like the following, so you can freely experiment with CMK encryption:

{
  "Version": "2012-10-17",
  "Id": "key-policy-permissive-for-dev-only",
  "Statement": [
    {
      "Sid": "Allow any KMS action to Admin",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<account-id>:role/Admin"
      },
      "Action": "kms:*",
      "Resource": "*"
    },
    {
      "Sid": "Allow any KMS action to Managed Flink",
      "Effect": "Allow",
      "Principal": { 
        "Service": [
          "kinesisanalytics.amazonaws.com",
          "infrastructure.kinesisanalytics.amazonaws.com"
        ]
      },
      "Action": [
        "kms:DescribeKey",
        "kms:Decrypt",
        "kms:GenerateDataKey",
        "kms:GenerateDataKeyWithoutPlaintext",
        "kms:CreateGrant"
      ],
      "Resource": "*"
    }
  ]
}

This policy must never be used in an environment containing sensitive data, and especially not in production.

Common caveats and pitfalls

As discussed earlier, this feature is designed to maximize security and promote best practices such as the principle of least privilege. However, this focus can introduce some corner cases you should be aware of.

The CMK must be enabled for the service to encrypt and decrypt snapshots and running state

With AWS KMS, you can disable one key at any time. If you disable the CMK while the application is running, it might cause unpredictable failures. For example, an application will not be able to restore a snapshot if the CMK used to encrypt that snapshot has been disabled. For example, if you attempt to roll back an UpdateApplication that changed the CMK, and the previous key has since been disabled, you might not be able to restore from an old snapshot. Similarly, you might not be able to restart the application from an older snapshot if the corresponding CMK is disabled.

If you encounter these scenarios, the solution is to reenable the required key and retry the operation.

The operator requires permissions to all keys involved

To perform an action on the application (such as Start, Stop, UpdateApplication, or CreateApplicationSnapshot), the operator must have permissions for all CMKs involved in that operation. AWS owned keys don’t require explicit permission.

Some operations implicitly involve two CMKs—for example, when switching from one CMK to another, or when switching from a CMK to an AWS owned key by disabling the CMK. In these cases, the operator must have permissions for both keys for the operation to succeed.

The same rule applies when rolling back an UpdateApplication action that involved multiple CMKs.

A new encryption key takes effect only after restart

A new encryption key is only used after the application is restarted. This is important when you rotate the key material for a CMK. Rotating the key material in AWS KMS doesn’t require updating the Managed Flink application’s configuration. However, you must restart the application as a separate step after rotating the key. If you don’t restart the application, it will continue to use the old encryption key for its running state and snapshots until the next restart.

For this reason, it is recommended not to enable automatic key rotation for the CMK. When automatic rotation is enabled, AWS KMS might rotate the key material at any time, but your application will not start using the new key until it is next restarted.

CMKs are only supported with Flink runtime 1.20 or later

CMKs are only supported when you are using the Flink runtime 1.20 or later. If your application is currently using an older runtime, you should upgrade to Flink 1.20 first. Managed Service for Apache Flink makes it straightforward to upgrade your existing application using the in-place version upgrade.

Conclusion

Managed Service for Apache Flink provides robust security by enabling encryption by default, protecting both the running state and persistently saved state of your applications. For organizations that require full control over their encryption keys (often due to regulatory or internal policy needs), the ability to use a CMK integrated with AWS KMS offers a new level of assurance.

By using CMKs, you can tailor encryption controls to your specific compliance requirements. However, this flexibility comes with the need for careful planning: the CMK feature is intentionally designed to enforce the principle of least privilege and strong role separation, which can introduce complexity around permissions and operational processes.

In this post, we reviewed the key steps for enabling CMKs on existing applications, creating new applications with a CMK, and managing key rotation. Each of these processes gives you greater control over your data security but also requires attention to access management and operational best practices.

To get started with CMKs and for more comprehensive guidance, refer to Key management in Amazon Managed Service for Apache Flink.


About the authors

Lorenzo Nicora

Lorenzo Nicora

Lorenzo works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working across industries both through consultancies and product companies. He has used open-source technologies extensively and contributed to several projects, including Apache Flink, and is the maintainer of the Flink Prometheus connector.

Sofia Zilberman

Sofia Zilberman

Sofia works as a Senior Streaming Solutions Architect at AWS, helping customers design and optimize real-time data pipelines using open-source technologies like Apache Flink, Kafka, and Apache Iceberg. With experience in both streaming and batch data processing, she focuses on making data workflows efficient, observable, and high-performing.

Deep dive into the Amazon Managed Service for Apache Fink application lifecycle – Part 2

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/part-2-deep-dive-into-the-amazon-managed-service-for-apache-fink-application-lifecycle/

In Part 1 of this series, we discussed fundamental operations to control the lifecycle of your Amazon Managed Service for Apache Flink application. If you are using higher-level tools such as AWS CloudFormation or Terraform, the tool will execute these operations for you. However, understanding the fundamental operations and what the service automatically does can provide some level of Mechanical Sympathy to confidently implement a more robust automation.

In the first part of this series, we focused on the happy paths. In an ideal world, failures don’t happen, and every change you deploy works perfectly. However, the real world is less predictable. Quoting Werner Vogels, Amazon’s CTO, “Everything fails, all the time.”

In this post, we explore failure scenarios that can happen during normal operations or when you deploy a change or scale the application, and how to monitor operations to detect and recover when something goes wrong.

The less happy path

A robust automation must be designed to handle failure scenarios, in particular during operations. To do that, we need to understand how Apache Flink can deviate from the happy path. Due to the nature of Flink as a stateful stream processing engine, detecting and resolving failure scenarios requires different techniques compared to other long-running applications, such as microservices or short-lived serverless functions (such as AWS Lambda).

Flink’s behavior on runtime errors: The fail-and-restart loop

When a Flink job encounters an unexpected error at runtime (an unhandled exception), the normal behavior is to fail, stop the processing, and restart from the latest checkpoint. Checkpoints allow Flink to support data consistency and no data loss in case of failure. Also, because Flink is designed for stream processing applications, which run continuously, if the error happens again, the default behavior is to keep restarting, hoping the problem is transient and the application will eventually recover the normal processing.In some cases, the problem is not transient, however. For example, when you deploy a code change that contains a bug, causing the job to fail as soon as it starts processing data, or if the expected schema doesn’t match the records in the source, causing deserialization or processing errors. The same scenario might also happen if you mistakenly changed a configuration that prevents a connector to reach the external system. In these cases, the job is stuck in a fail-and-restart loop, indefinitely, or until you actively force-stop it.

When this happens, the Managed Service for Apache Flink application status might be RUNNING, but the underlying Flink job is actually failing and restarting. The AWS Management Console gives you a hint, pointing that the application might need attention (see the following screenshot).

Application needs attention

In the following sections, we learn how to monitor the application and job status, to automatically react to this situation.

When starting or updating the application goes wrong

To understand the failure mode, let’s review what happens automatically when you start the application, or when the application restarts after you issued UpdateApplication command, as we explored in Part 1 of this series. The following diagram illustrates what happens when an application starts.

Application start process

The workflow consists of the following steps:

  1. Managed Service for Apache Flink provisions a cluster dedicated to your application.
  2. The code and configuration are submitted to the Job Manager node.
  3. The code in the main() method of your application runs, defining the dataflow of your application.
  4. Flink deploys to the Task Manager nodes the substasks that make up your job.
  5. The job and application status change to RUNNING. However, subtasks start initializing now.
  6. Subtasks restore their state, if applicable, and initialize any resources. For example, a Kafka connector’s subtask initializes the Kafka client and subscribes the topic.
  7. When all subtasks are successfully initialized, they change to RUNNING status and the job starts processing data.

To new Flink users, it can be confusing that a RUNNING status doesn’t necessarily imply the job is healthy and processing data.When something goes wrong during the process of starting (or restarting) the application, depending on the phase when the problem arises, you might observe two different types of failure modes:

  • (a) A problem prevents the application code from being deployed – Your application might encounter this failure scenario if the deployment fails as soon as the code and configuration are passed to the Job Manager (step 2 of the process), for example if the application code package is malformed. A typical error is when the JAR is missing a mainClass or if mainClass points to a class that doesn’t exist. This failure mode might also happen if the code of your main() method throws an unhandled exception (step 3). In these cases, the application fails to change to RUNNING, and reverts to READY after the attempt.
  • (b) The application is started, the job is stuck in a fail-and-restart loop – A problem might occur later in the process, after the application status has changed RUNNING. For example, after the Flink job has been deployed to the cluster (step 4 of the process), a component might fail to initialize (step 6). This might happen when a connector is misconfigured, or a problem prevents it from connecting to the external system. For example, a Kafka connector might fail to connect to the Kafka cluster because of the connector’s misconfiguration or networking issues. Another possible scenario is when the Flink job successfully initializes, but it throws an exception as soon as it starts processing data (step 7). When this happens, Flink reacts to a runtime error and might get stuck in a fail-and-restart loop.

The following diagram illustrates the sequence of application status, including the two failure scenarios just described.

Application statuses, with failure scenarios

Troubleshooting

We have examined what can go wrong during operations, in particular when you update a RUNNING application or restart an application after changing its configuration. In this section, we explore how we can act on these failure scenarios.

Roll back a change

When you deploy a change and realize something is not quite right, you normally want to roll back the change and put the application back in working order, until you investigate and fix the problem. Managed Service for Apache Flink provides a graceful way to revert (roll back) a change, also restarting the processing from the point it was stopped before applying the fault change, providing consistency and no data loss.In Managed Service for Apache Flink, there are two types of rollbacks:

  • Automatic – During an automatic rollback (also called system rollback), if enabled, the service automatically detects when the application fails to restart after a change, or when the job starts but immediately falls into a fail-and-restart loop. In these situations, the rollback process automatically restores the application configuration version before the last change was applied and restarts the application from the snapshot taken when the change was deployed. See Improve the resilience of Amazon Managed Service for Apache Flink application with system-rollback feature for more details. This feature is disabled by default. You can enable it as part of the application configuration.
  • Manual – A manual rollback API operation is like a system rollback, but it’s initiated by the user. If the application is running but you observe something not behaving as expected after applying a change, you can trigger the rollback operation using the RollbackApplication API action or the console. Manual rollback is possible when the application is RUNNING or UPDATING.

Both rollbacks work similarly, restoring the configuration version before the change and restarting with the snapshot taken before the change. This prevents data loss and brings you back to a version of the application that was working. Also, this uses the code package that was saved at the time you created the previous configuration version (the one you are rolling back to), so there is no inconsistency between code, configuration, and snapshot, even if in the meantime you have replaced or deleted the code package from the Amazon Simple Storage Service (Amazon S3) bucket.

Implicit rollback: Update with an older configuration

A third way to roll back a change is to simply update the configuration, bringing it back to what it was before the last change. This creates a new configuration version, and requires the correct version of the code package to be available in the S3 bucket when you issue the UpdateApplication command.

Why is there a third option when the service provides system rollback and the managed RollbackApplication action? Because most high-level infrastructure-as-code (IaC) frameworks such as Terraform use this strategy, explicitly overwriting the configuration. It is important to understand this possibility even though you will probably use the managed rollback if you implement your automation based on the low-level actions.

The following are two important caveats to consider for this implicit rollback:

  • You will normally want to restart the application from the snapshot that was taken before the faulty change was deployed. If the application is currently RUNNING and healthy, this is not the latest snapshot (RESTORE_FROM_LATEST_SNAPSHOT), but rather the previous one. You must set the restart from RESTORE_FROM_CUSTOM_SNAPSHOT and select the correct snapshot.
  • UpdateApplication only works if the application is RUNNING and healthy, and the job can be gracefully stopped with a snapshot. Conversely, if the application is stuck in a fail-and-restart loop, you must force-stop it first, change the configuration while the application is READY, and later start the application from the snapshot that was taken before the faulty change was deployed.

Force-stop the application

In normal scenarios, you stop the application gracefully, with the automatic snapshot creation. However, this might not be possible in some scenarios, such as if the Flink job is stuck in a fail-and-restart loop. This might happen, for example, if an external system the job uses stops working, or because the AWS Identity and Access Management (IAM) configuration was erroneously modified, removing permissions required by the job.

When the Flink job gets stuck in a fail-and-restart loop after a faulty change, your first option should be using RollbackApplication, which automatically restores the previous configuration and starts from the correct snapshot. In the rare cases you can’t stop the application gracefully or use RollbackApplication, the last resort is force-stopping the application. Force-stop uses the StopApplication command with Force=true. You can also force-stop the application from the console.

When you force-stop an application, no snapshot is taken (if that were possible, you would have been able to gracefully stop). When you restart the application, you can either skip restoring from a snapshot (SKIP_RESTORE_FROM_SNAPSHOT) or use a snapshot that was previously taken, scheduled using Snapshot Manager, or manually, using the console or CreateApplicationSnapshot API action.

We strongly recommend setting up scheduled snapshots for all production applications that you can’t afford restarting with no state.

Monitoring Apache Flink application operations

Effective monitoring of your Apache Flink applications during and after operations is crucial to verify the outcome of the operation and allow lifecycle automation to raise alarms or react, in case something goes wrong.

The main indicators you can use during operations include the FullRestarts metric (available in Amazon CloudWatch) and the application, job, and task status.

Monitoring the outcome of an operation

The simplest way to detect the outcome of an operation, such as StartApplication or UpdateApplication, is to use the ListApplicationOperations API command. This command returns a list of the most recent operations of a specific application, including maintenance events that force an application restart.

For example, to retrieve the status of the most recent operation, you can use the following command:

aws kinesisanalyticsv2 list-application-operations \
    --application-name MyApplication \
   | jq '.ApplicationOperationInfoList \
   | sort_by(.StartTime) | last'

The output will be similar to the following code:

{
  "Operation": "UpdateApplication",
  "OperationId": "12abCDeGghIlM",
  "StartTime": "2025-08-06T09:24:22+01:00",
  "EndTime": "2025-08-06T09:26:56+01:00",
  "OperationStatus": "IN_PROGRESS"
}

OperationStatus will follow the same logic as the application status reported by the console and by DescribeApplication. This means it might not detect a failure during the operator initialization or while the job starts processing data. As we have learned, these failures might put the application in a fail-and-restart loop. To detect these scenarios using your automation, you must use other techniques, which we cover in the rest of this section.

Detecting the fail-and-restart loop using the FullRestarts metric

The simplest way to detect whether the application is stuck in a fail-and-restart loop is using the fullRestarts metric, available in CloudWatch Metrics. This metric counts the number of restarts of the Flink job after you started the application with a StartApplication command or restarted with UpdateApplication.

In a healthy application, the number of full restarts should ideally be zero. A single full restart might be acceptable during deployment or planned maintenance; multiple restarts normally indicate some issue. We recommend not to trigger an alarm on a single restart, or even a couple of consecutive restarts.

The alarm should only be triggered when the application is stuck in a fail-and-restart loop. This implies checking whether several restarts have happened over a relatively short period of time. Deciding the period is not trivial, because the time the Flink job takes to restart from a checkpoint depends on the size of the application state. However, if the state of your application is lower than several GB per KPU, you can safely assume the application should start in less than a minute.

The goal is creating a CloudWatch alarm that triggers when fullRestarts keeps increasing over a time period sufficient for multiple restarts. For example, assuming your application restarts in less than 1 minute, you can create a CloudWatch alarm that relies on the DIFF math expression of the fullRestarts metric. The following screenshot shows an example of the alarm details.

CloudWatch Alarm on fullRestarts

This example is a conservative alarm, only triggering if the application keeps restarting for over 5 minutes. This means you detect the problem after at least 5 minutes. You might consider reducing the time to detect the failure earlier. However, be careful not to trigger an alarm after just one or two restarts. Occasional restarts might happen, for example during normal maintenance (patching) that is managed by the service, or for a transient error of an external system. Flink is designed to recover from these conditions with minimal downtime and no data loss.

Detecting whether the job is up and running: Monitoring application, job, and task status

We have discussed how you have different statuses: the status of the application, job, and subtask. In Managed Service for Apache Flink, the application and job status change to RUNNING when the subtasks are successfully deployed on the cluster. However, the job is not really running and processing data until all the subtasks are RUNNING.

Observing the application status during operations

The application status is visible on the console, as shown in the following screenshot.

Screenshot: Application status

In your automation, you can poll the DescribeApplication API action to observe the application status. The following command shows how to use the AWS Command Line Interface (AWS CLI) and jq command to extract the status string of an application:

aws kinesisanalyticsv2 describe-application \ 
    --application-name <your-application-name> \
    | jq -r '.ApplicationDetail.ApplicationStatus'

Observing job and subtask status

Managed Service for Apache Flink gives you access to the Flink Dashboard, which provides useful information for troubleshooting, including the status of all subtasks. The following screenshot, for example, shows a healthy job where all subtasks are RUNNING.

Job and Task status

In the following screenshot, we can see a job where subtasks are failing and restarting.

Job status: failing

In your automation, when you start the application or deploy a change, you want to be sure the job is eventually up and running and processing data. This happens when all the subtasks are RUNNING. Note that waiting for the job status to become RUNNING after an operation is not completely safe. A subtask might still fail and cause the job to restart after it was reported as RUNNING.

After you execute a lifecycle operation, your automation can poll the substasks status waiting for one of two events:

  • All subtasks report RUNNING – This indicates the operation was successful and your Flink job is up and running.
  • Any subtask reports FAILING or CANCELED – This indicates something went wrong, and the application is likely stuck in a fail-and-restart loop. You need to intervene, for example, force-stopping the application and then rolling back the change.

If you are restarting from a snapshot and the state of your application is quite big, you might observe subtasks will report INITIALIZING status for longer. During the initialization, Flink restores the state of the operator before changing to RUNNING.

The Flink REST API exposes the state of the subtasks, and can be used in your automation. In Managed Service for Apache Flink, this requires three steps:

  1. Generate a pre-signed URL to access the Flink REST API using the CreateApplicationPresignedUrl API action.
  2. Make a GET request to the /jobs endpoint of the Flink REST API to retrieve the job ID.
  3. Make a GET request to the /jobs/<job-id> endpoint to retrieve the status of the subtasks.

The following GitHub repository provides a shell script to retrieve the status of the tasks of a given Managed Service for Apache Flink application.

Monitoring subtasks failure while the job is running

The approach of polling the Flink REST API can be used in your automation, immediately after an operation, to observe whether the operation was eventually successful.

We strongly recommend not to continuously poll the Flink REST API while the job is running to detect failures. This operation is resource consuming, and might degrade performance or cause errors.

To monitor for suspicious subtask status changes during normal operations, we recommend using CloudWatch Logs instead. The following CloudWatch Logs Insights query extracts all subtask state transitions:

fields , message
| parse message /^(?<task>.+) switched from (?<fromStatus>[A-Z]+) to (?<toStatus>[A-Z]+)\./
| filter ispresent(task) and ispresent(fromStatus) and ispresent(toStatus)
| display , task, fromStatus, toStatus
| limit 10000

How Managed Service for Apache Flink minimizes processing downtime

We have seen how Flink is designed for strong consistency. To guarantee exactly-once state consistency, Flink temporarily stops the processing to deploy any changes, including scaling. This downtime is required for Flink to take a consistent copy of the application state and save it in a savepoint. After the change is deployed, the job is restarted from the savepoint, and there is no data loss. In Managed Service for Apache Flink, updates are fully managed. When snapshots are enabled, UpdateApplication automatically stops the job and uses snapshots (based on Flink’s savepoints) to retain the state.

Flink guarantees no data loss. However, your business requirements or Service Level Objectives (SLOs) might also impose a maximum delay for the data received by downstream systems, or end-to-end latency. This delay is affected by the processing downtime, or the time the job doesn’t process data to allow Flink deploying the change.With Flink, some processing downtime is unavoidable. However, Managed Service for Apache Flink is designed to minimize the processing downtime when you deploy a change.

We have seen how the service runs your application in a dedicated cluster, for complete isolation. When you issue UpdateApplication on a RUNNING application, the service prepares a new cluster with the required amount of resources. This operation might take some time. However, this doesn’t affect the processing downtime, because the service keeps the job running and processing data on the original cluster until the last possible moment, when the new cluster is ready. At this point, the service stops your job with a savepoint and restarts it on the new cluster.

During this operation, you are only charged for the number of KPU of a single cluster.

The following diagram illustrates the difference between the duration of the update operation, or the time the application status is UPDATING, and the processing downtime, observable from the job status, visible in the Flink Dashboard.

Downtime

You can observe this process, keeping both the application console and Flink Dashboard open, when you update the configuration of a running application, even with no changes. The Flink Dashboard will become temporarily unavailable when the service switches to the new cluster. Additionally, you can’t use the script we provided to check the job status for this scope. Even though the cluster keeps serving the Flink Dashboard until it’s tore down, the CreateApplicationPresignedUrl action doesn’t work while the application is UPDATING.

The processing time (the time the job is not running on either clusters) depends on the time the job takes to stop with a savepoint (snapshot) and restore the state in the new cluster. This time largely depends on the size of the application state. Data skew might also affect the savepoint time due to the barrier alignment mechanism. For a deep dive into the Flink’s barrier alignment mechanism, refer to Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints, keeping in mind that savepoints are always aligned.

For the scope of your automation, you normally want to wait until the job is back up and running and processing data. You normally want to set a timeout. If both the application and job don’t return to RUNNING within this timeout, something probably went wrong and you might want to raise an alarm or force a rollback. This timeout should consider the entire update operation duration.

Conclusion

In this post, we discussed possible failure scenarios when you deploy a change or scale your application. We showed how Managed Service for Apache Flink rollback functionalities can seamlessly bring you back to a safe place after a change went wrong. We also explored how you can automate monitoring operations to observe application, job, and subtask status, and how to use the fullRestarts metric to detect when the job is in a fail-and-restart loop.

For more information, see Run a Managed Service for Apache Flink application, Implement fault tolerance in Managed Service for Apache Flink, and Manage application backups using Snapshots.


About the authors

Lorenzo Nicora

Lorenzo Nicora

Lorenzo works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working across industries both through consultancies and product companies. He has used open-source technologies extensively and contributed to several projects, including Apache Flink, and is the maintainer of the Flink Prometheus connector.

Felix John

Felix John

Felix is a Global Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting global automotive & manufacturing customers on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.

Deep dive into the Amazon Managed Service for Apache Fink application lifecycle – Part 1

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/part-1-deep-dive-into-the-amazon-managed-service-for-apache-fink-application-lifecycle/

Apache Flink is an open source framework for stream and batch processing applications. It excels in handling real-time analytics, event-driven applications, and complex data processing with low latency and high throughput. Flink is designed for stateful computation with exactly-once consistency guarantees for the application state.

Amazon Managed Service for Apache Flink is a fully managed stream processing service that you can use to run Apache Flink jobs at scale without worrying about managing clusters and provisioning resources. You can focus on implementing your application using your integrated development environment (IDE) of choice, and build and package the application using standard build and continuous integration and delivery (CI/CD) tools.

With Managed Service for Apache Flink, you can control the application lifecycle through simple AWS API actions. You can use the API to start and stop the application, and to apply any changes to the code, runtime configuration, and scale. The service takes care of managing the underlying Flink cluster, giving you a serverless experience. You can implement automation such as CI/CD pipelines with tools that can interact with the AWS API or AWS Command Line Interface (AWS CLI).

You can control the application using the AWS Management Console, AWS CLI, AWS SDK, and tools using the AWS API, such as AWS CloudFormation or Terraform. The service is not prescriptive on the automation tool you use to deploy and orchestrate the application.

Paraphrasing Jackie Stewart, the famous racing driver, you don’t need to understand how to operate a Flink cluster to use Managed Service for Apache Flink, but some Mechanical Sympathy will help you implement a robust and reliable automation.

In this two-part series, we explore what happens during an application’s lifecycle. This post covers core concepts and the application workflow during normal operations. In Part 2, we look at potential failures, how to detect them through monitoring, and ways to quickly resolve issues when they occur.

Definitions

Before examining the application lifecycle steps, we need to clarify the usage of certain terms in the context of Managed Service for Apache Flink:

  • Application – The main resource you create, control, and run in Managed Service for Apache Flink is an application.
  • Application code package – For each Managed Service for Apache Flink application, you implement the application code package (application artifact) of the Flink application code you want to run. This code is compiled and packaged along with dependencies into a JAR or a ZIP file, that you upload to an Amazon Simple Storage Service (Amazon S3) bucket.
  • Configuration – Each application has a configuration that contains the information to run it. The configuration points to the application code package in the S3 bucket and defines the parallelism, which will also determine the application resources, in terms of KPUs. It also defines security, networking, and runtime properties, which are passed to your application code at runtime.
  • Job – When you start the application, Managed Service for Apache Flink creates a dedicated cluster for you and runs your application code as a Flink job.

The following diagram shows the relationship between these concepts.

Concepts

There are two additional important concepts: checkpoints and savepoints, the mechanisms Flink uses to guarantee state consistency across failures and operations. In Managed Service for Apache Flink, both checkpoints and savepoints are fully managed.

  • Checkpoints – These are controlled by the application configuration and enabled by default with a period of 1 minute. In Managed Service for Apache Flink, checkpoints are used when a job automatically restarts after a runtime failure. They are not durable and are deleted when the application is stopped or updated and when the application automatically scales.
  • Savepoints – These are called snapshots in Managed Service for Apache Flink, and are used to persist the application state when the application is deliberately restarted by the user, due to an update or an automatic scaling event. Snapshots can be triggered by the user. Snapshots (if enabled) are also automatically used to save and restore the application state when the application is stopped and restarted, for example to deploy a change or automatically scale. Automatic use of snapshots is enabled in the application configuration (enabled by default when you create an application using the console).

Lifecycle of an application in Managed Service for Apache Flink

Starting with the happy path, a typical lifecycle of a Managed Service for Apache Flink application comprises the following steps:

  1. Create and configure a new application.
  2. Start the application.
  3. Deploy a change (update the runtime configuration, update the application code, change the parallelism to scale up or down).
  4. Stop the application.

Starting, stopping, and updating the application use snapshots (if enabled) to retain application state consistency during operations. We recommend enabling snapshots on every production and staging application, to support the persistence of the application state across operations.

In Managed Service for Apache Flink, the application lifecycle is controlled through the console, API actions in the kinesisanalyticsv2 API, or equivalent actions in the AWS CLI and SDK. On top of these fundamental operations, you can build your own automation using different tools, directly using low-level actions or using higher level infrastructure-as-code (IaC) tooling such as AWS CloudFormation or Terraform.

In this post, we refer to the low-level API actions used at each step. Any higher-level IaC tooling will use combination of these operations. Understanding these operations is fundamental to designing a robust automation.

The following diagram summarizes the application lifecycle, showing typical operations and application statuses.

Application statuses

The status of your application, READY, STARTING, RUNNING, UPDATING, and so on, can be observed on the console and using the DescribeApplication API action.

In the following sections, we analyze each lifecycle operation in more detail.

Create and configure the application

The first step is creating a new Managed Service for Apache Flink application, including defining the application configuration. You can do this in a single step using the CreateApplication action, or by creating the basic application configuration and then updating the configuration before starting it using UpdateApplication. The latter approach is what you do when you create an application from the console.

In this phase, the developer packages the application they have implemented in a JAR file (for Java) or ZIP file (for Python) and uploads it to an S3 bucket the user has previously created. The bucket name and the path to the application code package are part of the configuration you define.

When UpdateApplication or CreateApplication is invoked, Managed Service for Apache Flink takes a copy of the application code package (JAR or ZIP file) referred by the configuration. The configuration is rejected if the file pointed by the configuration doesn’t exist.

The following diagram illustrates this workflow.

Create application

Simply updating the application code package in the S3 bucket doesn’t trigger an update. You need to run UpdateApplication to make the new file visible to the service and trigger the update, even when you overwrite the code package with the same name.

Start the application

Managed Service for Apache Flink provisions resources when the application is actually running, and you only pay for the resources of running applications. You explicitly control when to start the application by issuing a StartApplication.

Managed Service for Apache Flink indexes on high availability and runs your application in a dedicated Flink cluster. When you start the application, Managed Service for Apache Flink deploys a dedicated cluster and deploys and runs the Flink job based on the configuration you defined.

When you start the application, the status of the application moves from READY, to STARTING, and then RUNNING.

The following diagram illustrates this workflow.

Start application

Managed Service for Apache Flink supports both streaming mode, the default for Apache Flink, and batch mode:

  • Streaming mode – In streaming mode, after an application is successfully started and goes into RUNNING status, it keeps running until you stop it explicitly. From this point on, the behavior on failure is automatically restarting the job from the latest checkpoint, so there is no data loss. We discuss more details about this failure scenario later in this post.
  • Batch mode – A Flink application running in batch mode behaves differently. After you start it, it goes into RUNNING status, and the job continues running until it completes the processing. At that point the job will gracefully stop, and the Managed Service for Apache Flink application goes back to READY status.

This post focuses on streaming applications only.

Update the application

In Managed Service for Apache Flink, you handle the following changes by updating the application configuration, using the console or the UpdateApplication API action:

  • Application code changes, replacing the package (JAR or ZIP file) with one containing a new version
  • Runtime properties changes
  • Scaling, which implies changing parallelism and resources (KPU) changes
  • Operational parameter changes, such as checkpoint, logging level, and monitoring setup
  • Networking configuration changes

When you modify the application configuration, Managed Service for Apache Flink creates a new configuration version, identified by a version ID number, automatically incremented at every change.

Update the code package

We mentioned how the service takes a copy of the code package (JAR or ZIP file) when you update the application configuration. The copy is associated with the new application configuration version that has been created. The service uses its own copy of the code package to start the application. You can safely replace or delete the code package after you have updated the configuration. The new package is not taken into account until you update the application configuration again.

Update a READY (not running) application

If you update an application in READY status, nothing special happens beyond creating the new configuration version that will be used the next time you start the application. However, in production, you will normally update the configuration of an application in RUNNING status to apply a change. Managed Service for Apache Flink automatically handles the operations required to update the application with no data loss.

Update a RUNNING application

To understand what happens when you update a running application, you need to remember that Flink is designed for strong consistency and exactly-once state consistency. To maintain these features when a change is applied, Flink must stop the data processing, take a copy of the application state, restart the job with the changes, and restore the state, before processing can restart.

This is a standard Flink behavior, and applies to any changes, whether it’s code changes, runtime configuration changes, or new parallelism to scale up and down. Managed Service for Apache Flink automatically orchestrates this process for you. If snapshots are enabled, the service will take a snapshot before stopping the processing and restart from the snapshot when the change is deployed. This way, the change can be deployed with zero data loss.

If snapshots are disabled, the service restarts the job with the change, but the state will be empty, like the first time you started the application. This might cause data loss. You normally don’t want this to happen, particularly in production applications.

Let’s explore a practical example, illustrated by the following diagram. For instance, when you want to deploy a code change, the following steps typically happen (in this example, we assume that snapshots are enabled, which they should be in a production application):

  1. Make changes to the application code.
  2. The build process creates the application package (JAR or ZIP file), either manually or using CI/CD automation.
  3. Upload the new application package to an S3 bucket.
  4. Update the application configuration pointing to the new application package.
  5. As soon as you successfully update the configuration, Managed Service for Apache Flink starts the operation for updating the application. The application status changes to UPDATING. The Flink job is stopped, taking a snapshot of the application state.
  6. After the changes have been applied, the application is restarted using the new configuration, which in this case includes the new application code, and the job restores the state from the snapshot. When the process is complete, the application status goes back to RUNNING.

Update application

The process is similar for changes to the application configuration. For example, you can change the parallelism to scale the application updating the application configuration, causing the application to be redeployed with the new parallelism and the amount resources (CPU, memory, local storage) based on the new number of KPU.

Update the application’s IAM role

The application configuration contains a reference to an AWS Identity and Access Management (IAM) role. In the unlikely case you want to use a different role, you can update the application configuration using UpdateApplication. The process will be the same described earlier.

However, you usually want to modify the IAM role, to add or remove permissions. This operation doesn’t use the Managed Service for Apache Flink application lifecycle and can be done at any time. No application stop and restart is required. IAM changes take effect immediately, potentially inducing a failure if, for example, you inadvertently remove a required permission. In this case, the behavior of the Flink job’s response might vary, depending on the affected component.

Stop the application

You can stop a running Managed Service for Apache Flink application using the StopApplication action or the console. The service gracefully stops the application. The state turns from RUNNING, into STOPPING, and finally into READY.

When snapshots are enabled, the service will take a snapshot of the application state when it is stopped, as shown in the following diagram.

Stop application

After you stop the application, any resource previously provisioned to run your application is reclaimed. You incur no cost while the application is not running (READY).

Start the application from a snapshot

Sometimes, you might want to stop a production application and restart it later, restarting the processing from the point it was stopped. Managed Service for Apache Flink supports starting the application from a snapshot. The snapshot saves not only the application state, but also the point in the source—the offsets in a Kafka topic, for example—where the application stopped consuming.

When snapshots are enabled, Managed Service for Apache Flink automatically takes a snapshot when you stop the application. This snapshot can be used when you restart the application.

The StartApplication API command has three restore options:

  • RESTORE_FROM_LATEST_SNAPSHOT: Restore from the latest snapshot.
  • RESTORE_FROM_CUSTOM_SNAPSHOT: Restore from a custom snapshot (you need to specify which one).
  • SKIP_RESTORE_FROM_SNAPSHOT: Skip restoring from the snapshot. The application will start with no state, as the very first time you ran it.

When you start the application for the very first time, no snapshot is available yet. Regardless of the restore option you choose, the application will start with no snapshot.

The process of starting the application from a snapshot is visualized in the following diagram.

Start application with snapshot

In production, you normally want to restore from the latest snapshot (RESTORE_FROM_LATEST_SNAPSHOT). This will automatically use the snapshot the service created when you last stopped the application.

Snapshots are based on Flink’s savepoint mechanism and maintain the exactly-once consistency of the internal state. Also, the risk of reprocessing duplicate records from the source is minimized because the snapshot is taken synchronously while the Flink job is stopped.

Start the application from an older snapshot

In Managed Service for Apache Flink, you can schedule taking periodic snapshots of a running production application, for example using the Snapshot Manager. Taking a snapshot from a running application doesn’t stop the processing and only introduces a minimal overhead (comparable to checkpointing). With the second option, RESTORE_FROM_CUSTOM_SNAPSHOT, you can restart the application back in time, using a snapshot older than the one taken on the last StopApplication.

Because the source positions—for example, the offsets in a Kafka topic—are also restored with the snapshot, the application will revert to the point the application was processing when the snapshot was taken. This will also restore the state at that exact point, providing consistency.

When you start an application from an older snapshot, there are two important considerations:

  • Only restore snapshots taken within the source system retention period – If you restore a snapshot older than the source retention, data loss might occur, and the application behavior is unpredictable.
  • Restarting from an older snapshot will likely generate duplicate output – This is often not a problem when the end-to-end system is designed to be idempotent. However, this might cause problems if you are using a Flink transactional connector, such as File System sink or Kafka sink with exactly-once guarantees enabled. Because these sinks are designed to guarantee no duplicates (preventing them at any cost), they might prevent your application from restarting from an older snapshot. There are workarounds to this operational problem, but they depend on the specific use case and are beyond the scope of this post.

Understanding what happens when you start your application

We have learned the fundamental operations in the lifecycle of an application. In Managed Service for Apache Flink, these operations are controlled by a few API actions, such as StartApplication, UpdateApplication, and StopApplication. The service controls every operation for you. You don’t have to provision or manage Flink clusters. However, a better understanding of what happens during the lifecycle will give you sufficient Mechanical Sympathy to recognize potential failure modes and implement a more robust automation.

Let’s see in detail what happens when you issue a StartApplication command on an application in READY (not running). When you issue an UpdateApplication command on a RUNNING application, the application is first stopped with a snapshot, and then restarted with the new configuration, with a process identical to what we are going to see.

Composition of a Flink cluster

To understand what happens when you start the application, we need to introduce a couple of additional concepts. A Flink cluster is comprised of two types of nodes:

  • A single Job Manager, which acts as a coordinator
  • One or more Task Managers, which do the actual data processing

In Managed Service for Apache Flink, you can see the cluster nodes in the Flink Dashboard, which you can access from the console.

Flink decomposes the data processing defined by your application code into one or more subtasks, which are distributed across the Task Manager nodes, as illustrated in the following diagram.

Component of a Flink cluster

Remember, in Managed Service for Apache Flink, you don’t need to worry about provisioning and configuring the cluster. The service provides a dedicated cluster for your application. The total amount of vCPU, memory, and local storage of Task Managers matches the number of KPU you configured.

Starting your Managed Service for Apache Flink application

Now that we’ve discussed how a Flink cluster is composed, let’s explore what happens when you issue a StartApplication command, or when the application restarts after a change has been deployed with an UpdateApplication command.

The following diagram illustrates the process. Everything is carried out automatically for you.

Start application process

The workflow consists of the following steps:

  1. A dedicated cluster, with the amount of resources you requested, based on the number of KPU, is provisioned for your application.
  2. The application code, runtime properties, and other configurations such as the application parallelism are passed to the Job Manager node, the coordinator of the cluster.
  3. The Java or Python code in the main() method of your application is executed. This generates the logical graph of operators of your application (called dataflow). Based on the dataflow you defined and the application parallelism, Flink generates the subtasks, the actual nodes Flink will execute to process your data.
  4. Flink then distributes the job’s subtasks across Task Managers, the actual worker nodes of the cluster.
  5. When the previous step succeeds, the Flink job status and the Managed Service for Apache Flink application status change to RUNNING. However, the job is still not completely running and processing data. All substasks must be initialized.
  6. Each subtask independently restores its state, if starting from a snapshot, and initializes runtime resources. For example, Flink’s Kafka source connector restores the partition assignments and offsets from the savepoint (snapshot), establishes a connection to the Kafka cluster, and subscribes to the Kafka topic. From this step onward, a Flink job will stop and restart from its last checkpoint when encountering any unhandled error. If the problem causing the error is not transient, the job keeps stopping and restarting from the same checkpoint in a loop.
  7. When all subtasks are successfully initialized and change to RUNNING status, the Flink job starts processing data and is now properly running.

Conclusion

In this post, we discussed how the lifecycle of a Managed Service for Apache Flink application is controlled by simple AWS API commands, or the equivalent using the AWS SDK or AWS CLI. If you are using high-level automation tools such as AWS CloudFormation or Terraform, the low-level actions are also abstracted away for you. The service handles the complexity of operating the Flink cluster and orchestrating the Flink job lifecycle.

However, with a better understanding of how Flink works and what the service does for you, you can implement more robust automation and troubleshoot failures.

In the Part 2, we continue examining failure scenarios that can happen during normal operations or when you deploy a change or scale the application, and how to monitor operations to detect and recover when something goes wrong.


About the authors

Lorenzo Nicora

Lorenzo Nicora

Lorenzo works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working across industries both through consultancies and product companies. He has used open-source technologies extensively and contributed to several projects, including Apache Flink, and is the maintainer of the Flink Prometheus connector.

Felix John

Felix John

Felix is a Global Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting global automotive & manufacturing customers on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.

Process millions of observability events with Apache Flink and write directly to Prometheus

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/process-millions-of-observability-events-with-apache-flink-and-write-directly-to-prometheus/

AWS recently announced support for a new Apache Flink connector for Prometheus. The new connector, contributed by AWS to the Flink open source project, adds Prometheus and Amazon Managed Service for Prometheus as a new destination for Flink.

In this post, we explain how the new connector works. We also show how you can manage your Prometheus metrics data cardinality by preprocessing raw data with Flink to build real-time observability with Amazon Managed Service for Prometheus and Amazon Managed Grafana.

Amazon Managed Service for Prometheus is a secure, serverless, scaleable, Prometheus-compatible monitoring service. You can use the same open source Prometheus data model and query language that you use today to monitor the performance of your workloads without having to manage the underlying infrastructure. Flink connectors are software components that move data into and out of an Amazon Managed Service for Apache Flink application. You can use the new connector to send processed data to an Amazon Managed Service for Prometheus destination starting with Flink version 1.19. With Amazon Managed Service for Apache Flink, you can transform and analyze data in real time. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

Observability beyond compute

In an increasingly connected world, the boundary of systems extends beyond compute assets, IT infrastructure, and applications. Distributed assets such as Internet of Things (IoT) devices, connected cars, and end-user media streaming devices are an integral part of business operations in many sectors. The ability to observe every asset of your business is key to detecting potential issues early, improving the experience of your customers, and protecting the profitability of the business.

Metrics and time series

It is helpful to think of observability as three pillars: metrics, logs, and traces. The most relevant pillar for distributed devices, like IoT, is metrics. This is because metrics can capture measurements from sensors or counting of specific events emitted by the device.

Metrics are series of samples of a given measurement at specific times. For example, in the case of a connected vehicle, they can be the readings from the electric motor RPM sensor. Metrics are normally represented as time series, or sequences of discrete data points in chronological order. Metrics’ time series are normally associated with dimensions, also called labels or tags, to help with classifying and analyzing the data. In the case of a connected vehicle, labels might be something like the following:

  • Metric name – For example, “Electric Motor RPM”
  • Vehicle ID – A unique identifier of the vehicle, like the Vehicle Identification Number (VIN)

Prometheus as a specialized time series database

Prometheus is a popular solution for storing and analyzing metrics. Prometheus defines a standard interface for storing and querying time series. Commonly used in combination with visualization tools like Grafana, Prometheus is optimized for real-time dashboards and real-time alerting.

Often considered mainly for observing compute resources, like containers or applications, Prometheus is actually a specialized time series database that can effectively be used to observe different types of distributed assets, including IoT devices.

Amazon Managed Service for Prometheus is a serverless, Prometheus-compatible monitoring service. See What is Amazon Managed Service for Prometheus? to learn more about Amazon Managed Service for Prometheus.

Effectively processing observability events, at scale

Handling observability data at scale becomes more challenging, due to the number of assets and unique metrics, especially when observing massively distributed devices, for the following reasons:

  • High cardinality – Each device emits multiple metrics or types of events, each to be tracked independently.
  • High frequency – Devices might emit events very frequently, multiple times per second. This might result in a large volume of raw data. This aspect in particular represents the main difference from observing compute resources, which are usually scraped at longer intervals.
  • Events arrive at irregular intervals and out of order – Unlike compute assets that are usually scraped at regular intervals, we often see delays of transmission or temporarily disconnected devices, which cause events to arrive at irregular intervals. Concurrent events from different devices might follow different paths and arrive at different times.
  • Lack of contextual information – Devices often transmit over channels with limited bandwidth, such as GPRS or Bluetooth. To optimize communication, events seldom contain contextual information, such as device model or user detail. However, this information is required for an effective observability.
  • Derive metrics from events – Devices often emit specific events when specific facts happen. For example, when the vehicle ignition is turned on or off, or when a warning is emitted by the onboard computer. These are not direct metrics. However, counting and measuring the rates of these events are valuable metrics that can be inferred from these events.

Effectively extracting value from raw events requires processing. Processing might happen on read, when you query the data, or upfront, before storing.

Storing and analyzing raw events

The common approach with observability events, and with metrics in particular, is “storing first.” You can simply write the raw metrics into Prometheus. Processing, such as grouping, aggregating, and calculating derived metrics, happens “on query,” when data is extracted from Prometheus.

This approach might become particularly inefficient when you’re building real-time dashboards or alerting, and your data has very high cardinality or high frequency. As a time series database is continuously queried, a large volume of data is repeatedly extracted from the storage and processed. The following diagram illustrates this workflow.

Process on query

Preprocessing raw observability events

Preprocessing raw events before storing shifts the work left, as illustrated in the following diagram. This increases the efficiency of real-time dashboards and alerts, allowing the solution to scale.

Pre-process

Apache Flink for preprocessing observability events

Preprocessing raw observability events requires a processing engine that allows you to do the following:

  • Enrich events efficiently, looking up reference data and adding new dimensions to the raw events. For example, adding the vehicle model based on the vehicle ID. Enrichment allows adding new dimensions to the time series, enabling analysis otherwise impossible.
  • Aggregate raw events over time windows, to reduce frequency. For example, if a vehicle emits an engine temperature measurement every second, you can emit a single sample with the average over 5 seconds. Prometheus can efficiently aggregate frequent samples on read. However, ingesting data with a frequency much higher than what is useful for dashboarding and real-time alerting is not an efficient use of Prometheus ingestion throughout and storage.
  • Aggregate raw events over dimensions, to reduce cardinality. For example, aggregating some measurement per vehicle model.
  • Calculate derived metrics applying arbitrary logic. For example, counting the number of warning events emitted by each vehicle. This also enables analysis otherwise impossible using only Prometheus and Grafana.
  • Support event-time semantics, to aggregate over time events from different sources.

Such a preprocessing engine must also be able to scale and process the large volume of input raw events, and to process data with low latency—normally subsecond or single-digit seconds—to enable real-time dashboards and altering. To address these requirements, we see many customers using Flink.

Apache Flink meets the aforementioned requirements. Flink is a framework and distributed stream processing engine, designed to perform computations at in-memory speed and at scale. Amazon Managed Service for Apache Flink offers a fully managed, serverless experience, allowing you to run your Flink applications without managing infrastructure or clusters.

Amazon Managed Service for Apache Flink can process the ingested raw events. The resulting metrics, with lower cardinality and frequency, and additional dimensions, can be written to Prometheus for a more effective visualization and analysis. The following diagram illustrates this workflow.

Amazon Managed Service for Apache Flink, Amazon Managed Prometheus and Grafana

Integrating Apache Flink and Prometheus

The new Flink Prometheus connector allows Flink applications to seamlessly write preprocessed time series data to Prometheus. No intermediate component is needed, and there is no requirement to implement a custom integration. The connector is designed to scale, using the ability of Flink to scale horizontally, and optimizing the writes to a Prometheus backend using a Remote-Write interface.

Example use case

AnyCompany is a car rental company managing a fleet of hundreds of thousands hybrid connected vehicles, in multiple regions. Each vehicle continuously transmits measurements from several sensors. Each sensor emits a sample every second or more frequently. Vehicles also communicate warning events when something wrong is detected by the onboard computer. The following diagram illustrates the workflow.

Example use case: connected cars

AnyCompany is planning to use Amazon Managed Service for Prometheus and Amazon Managed Grafana to visualize vehicle metrics and set up custom alerts.

However, building a real-time dashboard based on raw data, as transmitted by the vehicles, might be complicated and inefficient. Each vehicle might have hundreds of sensors, each of them resulting in a separate time series to display. Additionally, AnyCompany wants to monitor the behavior of different vehicle models. Unfortunately, the events transmitted by the vehicles only contain the VIN. The model can be inferred by looking up (joining) some reference data.

To overcome these challenges, AnyCompany has built a preprocessing stage based on Amazon Managed Service for Apache Flink. This stage has the following capabilities:

  • Enrich the raw data by adding the vehicle model, and looking up reference data based on the vehicle identification.
  • Reduce the cardinality, aggregating the results per vehicle model, available after the enrichment step.
  • Reduce the frequency of the raw metrics to reduce write bandwidth, aggregating over time windows of a few seconds.
  • Calculate derived metrics based on multiple raw metrics. For example, determine whether a vehicle is in motion when either the internal combustion engine or the electrical motor are rotating.

The result of preprocessing are more actionable metrics. A dashboard built on these metrics can, for example, help determine whether the last software update released over-the-air to all vehicles of a specific model in specific regions, is causing issues.

Using the Flink Prometheus connector, the preprocessor application can write directly to Amazon Managed Service for Prometheus, without intermediate components.

Nothing prevents you from choosing to write raw metrics with full cardinality and frequency to Prometheus, allowing you to drill down to the single vehicle. The Flink Prometheus connector is designed to scale by batching and parallelizing writes.

Solution overview

The following GitHub repository contains a fictional end-to-end example covering this use case. The following diagram illustrates the architecture of this example.

Example architecture

The workflow consists of the following steps:

  1. Vehicles, radio transmission, and ingestion of IoT events have been abstracted away, and replaced by a data generator that produces raw events for a hundred thousand fictional vehicles. For simplicity, the data generator is itself an Amazon Managed Service for Apache Flink application.
  2. Raw vehicle events are sent to a stream storage service. In this example, we use Amazon Managed Streaming for Apache Kafka (Amazon MSK).
  3. The core of the system is the preprocessor application, running in Amazon Managed Service for Apache Flink. We will dive deeper into the details of the processor in the following sections.
  4. Processed metrics are directly written to the Prometheus backend, in Amazon Managed Service for Prometheus.
  5. Metrics are used to generate real-time dashboards on Amazon Managed Grafana.

The following screenshot shows a sample dashboard.

Grafana dashboard

Raw vehicle events

Each vehicle transmits three metrics almost every second:

  • Internal combustion (IC) engine RPM
  • Electric motor RPM
  • Number of reported warnings

The raw events are identified by the vehicle ID and the region where the vehicle is located.

Preprocessor application

The following diagram illustrates the logical flow of the preprocessing application running in Amazon Managed Service for Apache Flink.

Flink application logical data flow

The workflow consists of the following steps:

  1. Raw events are ingested from Amazon MSK from Flink Kafka source.
  2. An enrichment operator adds the vehicle model, which is not contained in the raw events. This additional dimension is then used to aggregate the raw events. The resulting metrics have only two dimensions: vehicle model and region.
  3. Raw events are then aggregated over time windows (5 seconds) to reduce frequency. In this example, the aggregation logic also generates a derived metric: the number of vehicles in motion. A new metric can be derived from raw metrics with arbitrary logic. For the sake of the example, a vehicle is considered “in motion” if either the IC engine or electric motor RPM metric are not zero.
  4. The processed metrics are mapped into the input data structure of the Flink Prometheus connector, which maps directly to the time series records expected by the Prometheus Remote-Write interface. Refer to the connector documentation for more details.
  5. Finally, the metrics are sent to Prometheus using the Flink Prometheus connector. Write authentication, required by Amazon Managed Service for Prometheus, is seamlessly enabled using the Amazon Managed Service for Prometheus request signer provided with the connector. Credentials are automatically derived from the AWS Identity and Access Management (IAM) role of the Amazon Managed Service for Apache Flink application. No additional secret or credential is required.

In the GitHub repository, you can find the step-by-step instructions to set up the working example and create the Grafana dashboard.

Flink Prometheus connector key features

The Flink Prometheus connector allows Flink applications to write processed metrics to Prometheus, using the Remote-Write interface.

The connector is designed to scale write throughput by:

  • Parallelizing writes, using the Flink parallelism capability
  • Batching multiple samples in a single write request to the Prometheus endpoint

Error handling complies with Prometheus Remote-Write 1.0 specifications. The specifications are particularly strict about malformed or out-of-order data rejected by Prometheus.

When a malformed or out-of-order write is rejected, the connector discards the offending write request and continues, preferring data freshness over completeness. However, the connector makes data loss observable, emitting WARN log entries and exposing metrics that measure the volume of discarded data. In Amazon Managed Service for Apache Flink, these connector metrics can be automatically exported to Amazon CloudWatch.

Responsibilities of the user

The connector is optimized for efficiency, write throughput, and latency. Validation of incoming data would be particularly expensive in terms of CPU utilization. Additionally, different Prometheus backend implementations enforce constraints differently. For these reasons, the connector doesn’t validate incoming data before writing to Prometheus.

The user is responsible of making sure that the data sent to the Flink Prometheus connector follows the constraints enforced by the particular Prometheus implementations they are using.

Ordering

Ordering is particularly relevant. Prometheus expects that samples belonging to the same time series—samples with the same metric name and labels—are written in time order. The connector makes sure ordering is not lost when data is partitioned to parallelize writes.

However, the user is responsible for retaining the ordering upstream in the pipeline. To achieve this, the user must carefully design data partitioning within the Flink application and the stream storage. Only partitioning by key must be used, and partitioning keys must compound the metric name and all labels that will be used in Prometheus.

Conclusion

Prometheus is a specialized time series database, designed for building real-time dashboards and altering. Amazon Managed Service for Prometheus is a fully managed, serverless backend compatible with the Prometheus open source standard. Amazon Managed Grafana allows you to build real-time dashboards, seamlessly interfacing with Amazon Managed Service for Prometheus.

You can use Prometheus for observability use cases beyond compute resource, to observe IoT devices, connected cars, media streaming devices, and other highly distributed assets providing telemetry data.

Directly visualizing and analyzing high-cardinality and high-frequency data can be inefficient. Preprocessing raw observability events with Amazon Managed Service for Apache Flink shifts the work left, greatly simplifying the dashboards or alerting you can build on top of Amazon Managed Service for Prometheus.

For more information about running Flink, Prometheus, and Grafana on AWS, see the resources of these services:

For more information about the Flink Prometheus integration, see the Apache Flink documentation.


About the authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working across industries both through consultancies and product companies. He has used open-source technologies extensively and contributed to several projects, including Apache Flink, and is the maintainer of the Flink Prometheus connector.

Francisco MorilloFrancisco Morillo is a Senior Streaming Solutions Architect at AWS. Francisco works with AWS customers, helping them design real-time analytics architectures using AWS services, supporting Amazon MSK and Amazon Managed Service for Apache Flink. He is also a main contributor to the Flink Prometheus connector.

Introducing the new Amazon Kinesis source connector for Apache Flink

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/introducing-the-new-amazon-kinesis-source-connector-for-apache-flink/

On November 11, 2024, the Apache Flink community released a new version of AWS services connectors, an AWS open source contribution. This new release, version 5.0.0, introduces a new source connector to read data from Amazon Kinesis Data Streams. In this post, we explain how the new features of this connector can improve performance and reliability of your Apache Flink application.

Apache Flink has both a source and sink connector, to read from and write to Kinesis Data Streams. In this post, we focus on the new source connector, because version 5.0.0 does not introduce new functionality for the sink.

Apache Flink is a framework and distributed stream processing engine designed to perform computation at in-memory speed and at any scale. Amazon Managed Service for Apache Flink offers a fully managed, serverless experience to run your Flink applications, implemented in Java, Python or SQL, and using all the APIs available in Flink: SQL, Table, DataStream, and ProcessFunction API.

Apache Flink connectors

Flink supports reading and writing data to external systems, through connectors, which are components that allow your application to interact with stream-storage message brokers, databases, or object stores. Kinesis Data Streams is a popular source and destination for streaming applications. Flink provides both source and sink connectors for Kinesis Data Streams.

The following diagram illustrates a sample architecture.

Role of connectors in a Flink applications

Before proceeding further, it’s important to clarify three terms often used interchangeably in data streaming and in the Apache Flink documentation:

  • Kinesis Data Streams refers to the Amazon service
  • Kinesis source and Kinesis consumer refer to the Apache Flink components, in particular the source connectors, that allows reading data from Kinesis Data Streams
  • In this post, we use the term stream referring to a single Kinesis data stream

Introducing the new Flink Kinesis source connector

The launch of the version 5.0.0 of AWS connectors introduces a new connector for reading events from Kinesis Data Streams. The new connector is called Kinesis Streams Source and supersedes the Kinesis Consumer as the source connector for Kinesis Data Streams.

The new connector introduces several new features and adheres to the new Flink Source interface, and is compatible with Flink 2.x, the first major version release by the Flink community. Flink 2.x introduces a number of breaking changes, including removing the SourceFunction interface used by legacy connectors. The legacy Kinesis Consumer will no longer work with Flink 2.x.

Setting up the connector is slightly different than with the legacy Kinesis connector. Let’s start with the DataStream API.

How to use the new connector with the DataStream API

To add the new connector to your application, you need to update the connector dependency. For the DataStream API, the dependency has changed its name to flink-connector-aws-kinesis-streams.

At the time of writing, the latest connector version is 5.0.0 and it supports the most recent stable Flink versions, 1.19 and 1.20. The connector is also compatible with Flink 2.0, but no connector has been officially released for Flink 2.x yet. Assuming you are using Flink 1.20, the new dependency is the following:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-aws-kinesis streams</artifactId>
    <version>5.0.0-1.20</version>
</dependency>

The connector uses the new Flink Source interface. This interface implements the new FLIP-27 standard, and replaces the legacy SourceFunction interface that has been deprecated. SourceFunction will be completely removed in Flink 2.x.

In your application, you can now use a fluent and expressive builder interface to instantiate and configure the source. The minimal setup only requires the stream Amazon Resource Name (ARN) and the deserialization schema:

KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder()
    .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
    .setDeserializationSchema(new SimpleStringSchema())
    .build();

The new source class is called KinesisStreamSource. Not to be confused with the legacy source, FlinkKinesisConsumer.

You can then add the source to the execution environment using the new fromSource() method. This method requires explicitly specifying the watermark strategy, along with a name for the source:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(
    kdsSource,
    WatermarkStrategy.<String>forMonotonousTimestamps()
        .withIdleness(Duration.ofSeconds(1)),
    "Kinesis source");

These few lines of code introduce some of the main changes in the interface of the connector, which we discuss in the following sections.

Stream ARN

You can now define the Kinesis data stream ARN, as opposed to the stream name. This makes it simpler to consume from streams cross-Region and cross-account.

When running in Amazon Managed Service for Apache Flink, you only need to add to the application AWS Identity and Access Management (IAM) role permissions to access the stream. The ARN allows pointing to a stream located in a different AWS Region or account, without assuming roles or passing any external credentials.

Explicit watermark

One of the most important characteristics of the new Source interface is that you have to explicitly define a watermark strategy when you attach the source to the execution environment. If your application only implements processing-time semantics, you can specify WatermarkStrategy.noWatermarks().

This is an improvement in terms of code readability. Looking at the source, you know immediately which type of watermark you have, or if you don’t have any. Previously, many connectors were providing some type of default watermarks that the user could override. However, the default watermark of each connector was slightly different and confusing for the user.

With the new connector, you can achieve the same behavior as the legacy FlinkKinesisConsumer default watermarks, using WatermarkStrategy.forMonotonousTimestamps(), as shown in the previous example. This strategy generates watermarks based on the approximateArrivalTimestamp returned by Kinesis Data Streams. This timestamp corresponds to the time when the record was published to Kinesis Data Streams.

Idleness and watermark alignment

With the watermark strategy, you can additionally define an idleness, which allows the watermark to progress even when some shards of the stream are idle and receiving no records. Refer to Dealing With Idle Sources for more details about idleness and watermark generators.

A feature introduced by the new Source interface, and fully supported by the new Kinesis source, is watermark alignment. Watermark alignment works in the opposite direction of idleness. It slows down consuming from a shard that is progressing faster than others. This is particularly useful when replaying data from a stream, to reduce the volume of data buffered in the application state. Refer to Watermark alignment for more details.

Set up the connector with the Table API and SQL

Assuming you are using Flink 1.20, the dependency containing both Kinesis source and sink for the Table API and SQL is the following (both Flink 1.19 and 1.20 are supported, adjust the version accordingly):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId>
    <version>5.0.0-1.20</version>
</dependency>

This dependency contains both the new source and the legacy source. Refer to Versioning in case you are planning to use both in the same application.

When defining the source in SQL or the Table API, you use the connector name kinesis, as it was with the legacy source. However, many parameters have changed with the new source:

CREATE TABLE KinesisTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `category_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
    'connector' = 'kinesis',
    'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
    'aws.region' = 'us-east-1',
    'source.init.position' = 'LATEST',
    'format' = 'csv'
);

A couple of notable connector options changed from the legacy source are:

  • stream.arn specifies the stream ARN, as opposed to the stream name used in the legacy source.
  • init.initpos defines the starting position. This option works similarly to the legacy source, but the option name is different. It was previously scan.stream.initpos.

For the full list of connector options refer to Connector Options.

New features and improvements

In this section, we discuss the most important features introduced by the new connector. These features are available in the DataStream API, and also the Table API and SQL.

Ordering guarantees

The most important improvement introduced by the new connector is about ordering guarantees.

With Kinesis Data Streams, the order of the message is retained per partitionId. This is achieved by putting all records with the same partitionId in the same shard. However, when the stream scales, splitting or merging shards, records with the same partitionId end up in a new shard. Kinesis keeps track of the parent-child lineage when resharding happens.

Stream resharding

One known limitation of the legacy Kinesis source is that it was unable to follow the parent-child shard lineage. As a consequence, ordering could not be guaranteed when resharding happens. The problem was particularly relevant when the application replayed old messages from a stream that had been resharded because ordering would be lost. This also made watermark generation and event-time processing non-deterministic.

With the new connector, ordering is retained also when resharding happens. This is achieved following the parent-child shard lineage, and consuming all records from a parent shard before proceeding with the child shard.

How the connector follows shard lineage

A better default shard assigner

Each Kinesis data stream is comprised of many shards. Also, the Flink source operator runs in multiple parallel subtasks. The shard assigner is the component that decides how to assign the shards of the stream across the source subtasks. The shard assigner’s job is non-trivial, because shard split or merge operations (resharding) might happen when the stream scales up or down.

The new connector comes with a new default assigner, UniformShardAssigner. This assigner maintains uniform distribution of the stream partitionId across parallel subtasks, also when resharding happens. This is achieved by looking at the range of partition keys (HashKeyRange) of each shard.

This shard assigner was already available in the previous connector version, but for backward compatibility, it was not the default and you had to set it up explicitly. This is no longer the case with the new source. The old default shard assigner, the legacy FlinkKinesisConsumer, was evenly distributing shards (not partitionId) across subtasks. In this case, the actual data distribution might become uneven in the case of resharding, because of the combination of open and closed shards in the stream. Refer to Shard Assignment Strategy for more details.

Reduced JAR size

The size of the JAR file has been reduced by 99%, from about 60 MB down to 200 KB. This substantially reduces the size of the fat-JAR of your application using the connector. A smaller JAR can speed up many operations that require redeploying the application.

AWS SDK for Java 2.x

The new connector is based on the newer AWS SDK for Java 2.x, which adds several features and improves support for non-blocking I/O. This makes the connector future-proof because the AWS SDK v1 will reach end-of-support by end of 2025.

AWS SDK built-in retry strategy

The new connector relies on the AWS SDK built-in retry strategy, as opposed to a custom strategy implemented by the legacy connector. Relying on the AWS SDK improves the classification of some errors as retriable or non-retriable.

Removed dependency on the Kinesis Client Library and Kinesis Producer Library

The new connector package no longer includes the Kinesis Client Library (KCL) and Kinesis Producer Library (KPL), contributing to the substantial reduction of the JAR size that we have mentioned.

An implication of this change is that the new connector no longer supports de-aggregation out of the box. Unless you are publishing records to the stream using the KPL and you enabled aggregation, this will not make any difference for you. If your producers use KPL aggregation, you might consider implementing a custom DeserializationSchema to de-aggregate the records in the source.

Migrating from the legacy connector

Flink sources typically save the position in the checkpoint and savepoints, called snapshots in Amazon Managed Service for Apache Flink. When you stop and restart the application, or when you update the application to deploy a change, the default behavior is saving the source position in the snapshot just before stopping the application, and restoring the position when the application restarts. This allows Flink to provide exactly-once guarantees on the source.

However, due to the major changes introduced by the new KinesisSource, the saved state is no longer compatible with the legacy FlinkKinesisConsumer. This means that when you upgrade the source of an existing application, you can’t directly restore the source position from the snapshot.

For this reason, migrating your application to the new source requires some attention. The exact migration process depends on your use case. There are two general scenarios:

  • Your application uses the DataStream API and you are following Flink best practices defining a UID on each operator
  • Your application uses the Table API or SQL, or your application used the DataStream API and you are not defining a UID on each operator

Let’s cover each of these scenarios.

Your application uses the DataStream API and you are defining a UID on each operator

In this case, you might consider selectively resetting the state of the source operator, retaining any other application state. The general approach is as follows:

  1. Update your application dependencies and code, replacing the FlinkKinesisConsumer with the new KinesisSource.
  2. Change the UID of the source operator (use a different string). Leave all other operators’ UIDs This will selectively reset the state of the source while retaining the state of all other operators.
  3. Configure the source starting position using AT_TIMESTAMP and set the timestamp to just before the moment you will deploy the change. See Configuring Starting Position to learn how to set the starting position. We recommend passing the timestamp as a runtime property to make this more flexible. The configured source starting position is used only when the application can’t restore the state from a savepoint (or snapshot). In this case, we are deliberately forcing this, changing the UID of the source operator.
  4. Update the Amazon Managed Service for Apache Flink application, selecting the new JAR containing the modified application. Restart from the latest snapshot (default behavior) and select allowNonRestoredState = true. Without this flag, Flink would prevent restarting the application, not being able to restore the state of the old source that was saved in the snapshot. See Savepointing for more details about allowNonRestoredState.

This approach will cause the reprocessing of some records from the source, and internal state exactly-once consistency can be broken. Carefully evaluate the impact of reprocessing on your application, and the impact of duplicates on the downstream systems.

Your application uses the Table API or SQL, or your application used the DataStream API and you are not defining a UID on each operator

In this case, you can’t selectively reset the state of the source operator.

Why does this happen? When using the Table API or SQL, or the DataStream API without defining the operator’s UID explicitly, Flink automatically generates identifiers for all operators based on the structure of the job graph of your application. These identifiers are used to identify the state of each operator when saved in the snapshots, and to restore it to the correct operator when you restart the application.

Changes to the application might cause changes in the underlying data flow. This changes the auto-generated identifier. If you are using the DataStream API and you are specifying the UID, Flink uses your identifiers instead of the auto-generated identifies, and is able to map back the state to the operator, even when you make changes to the application. This is an intrinsic limitation of Flink, explained in Set UUIDs For All Operators. Enabling allowNonRestoredState does not solve this problem, because Flink is not able to map the state saved in the snapshot with the actual operators, after the changes.

In our migration scenario, the only option is resetting the state of your application. You can achieve this in Amazon Managed Service for Apache Flink by selecting Skip restore from snapshot (SKIP_RESTORE_FROM_SNAPSHOT) when you deploy the change that replaces the source connector.

After the application using the new source is up and running, you can switch back to the default behavior of when restarting the application, using the latest snapshots (RESTORE_FROM_LATEST_SNAPSHOT). This way, no data loss happens when the application is restarted.

Choosing the right connector package and version

The dependency version you need to pick is normally <connector-version>-<flink-version>. For example, the latest Kinesis connector version is 5.0.0. If you are using a Flink runtime version 1.20.x, your dependency for the DataStream API is 5.0.0-1.20.

For the most up-to-date connector versions, see Use Apache Flink connectors with Managed Service for Apache Flink.

Connector artifact

In previous versions of the connector (4.x and before), there were separate packages for the source and sink. This additional level of complexity has been removed with version 5.x.

For your Java application, or Python applications where you package JAR dependencies using Maven, as shown in the Amazon Managed Service for Apache Flink examples GitHub repository, the following dependency contains the new version of both source and sink connectors:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-aws-kinesis-streams</artifactId>
    <version>5.0.0-1.20</version>
</dependency>

Make sure you’re using the latest available version. At the time of writing, this is 5.0.0. You can verify the available artifact versions in Maven Central. Also, use the correct version depending on your Flink runtime version. The previous example is for Flink 1.20.0.

Connector artifacts for Python application

If you use Python, we recommend packaging JAR dependencies using Maven, as shown in the Amazon Managed Service for Apache Flink examples GitHub repository. However, if you’re passing directly a single JAR to your Amazon Managed Service for Apache Flink application, you need to use the artifact that includes all transitive dependencies. In the case of the new Kinesis source and sink, this is called flink-sql-connector-aws-kinesis-streams. This artifact includes only the new source. Refer to Amazon Kinesis Data Streams SQL Connector for the right package, in case you want to use both the new and the legacy source.

Conclusion

The new Flink Kinesis source connector introduces many new features that improve stability and performance, and prepares your application for Flink 2.x. Support for watermark idleness and alignment is a particularly important feature if your application uses event-time semantics. The ability to retain record ordering improves data consistency, in particular when stream resharding happens, and when you replay old data from a stream that has been reshared.

You should carefully plan the change if you’re migrating your application from the legacy Kinesis source connector, and make sure you follow Flink’s best practices like specifying a UID on all DataStream operators.

You can find a working example of Java DataStream API application using the new connector, in the Amazon Managed Service for Apache Flink samples GitHub repository.

To learn more about the new Flink Kinesis source connector, refer to Amazon Kinesis Data Streams Connector and Amazon Kinesis Data Streams SQL Connector.


About the Author

Lorenzo NicoraLorenzo Nicora works as a Senior Streaming Solutions Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working across industries both through consultancies and product companies. He has used open source technologies extensively and contributed to several projects, including Apache Flink.

Amazon Managed Service for Apache Flink now supports Apache Flink version 1.18

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/amazon-managed-service-for-apache-flink-now-supports-apache-flink-version-1-18/

Apache Flink is an open source distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink, which offers a fully managed, serverless experience in running Apache Flink applications, now supports Apache Flink 1.18.1, the latest version of Apache Flink at the time of writing.

In this post, we discuss some of the interesting new features and capabilities of Apache Flink, introduced with the most recent major releases, 1.16, 1.17, and 1.18, and now supported in Managed Service for Apache Flink.

New connectors

Before we dive into the new functionalities of Apache Flink available with version 1.18.1, let’s explore the new capabilities that come from the availability of many new open source connectors.

OpenSearch

A dedicated OpenSearch connector is now available to be included in your projects, enabling an Apache Flink application to write data directly into OpenSearch, without relying on Elasticsearch compatibility mode. This connector is compatible with Amazon OpenSearch Service provisioned and OpenSearch Service Serverless.

This new connector supports SQL and Table APIs, working with both Java and Python, and the DataStream API, for Java only. Out of the box, it provides at-least-once guarantees, synchronizing the writes with Flink checkpointing. You can achieve exactly-once semantics using deterministic IDs and upsert method.

By default, the connector uses OpenSearch version 1.x client libraries. You can switch to version 2.x by adding the correct dependencies.

Amazon DynamoDB

Apache Flink developers can now use a dedicated connector to write data into Amazon DynamoDB. This connector is based on the Apache Flink AsyncSink, developed by AWS and now an integral part of the Apache Flink project, to simplify the implementation of efficient sink connectors, using non-blocking write requests and adaptive batching.

This connector also supports both SQL and Table APIs, Java and Python, and DataStream API, for Java only. By default, the sink writes in batches to optimize throughput. A notable feature of the SQL version is support for the PARTITIONED BY clause. By specifying one or more keys, you can achieve some client-side deduplication, only sending the latest record per key with each batch write. An equivalent can be achieved with the DataStream API by specifying a list of partition keys for overwriting within each batch.

This connector only works as a sink. You cannot use it for reading from DynamoDB. To look up data in DynamoDB, you still need to implement a lookup using the Flink Async I/O API or implementing a custom user-defined function (UDF), for SQL.

MongoDB

Another interesting connector is for MongoDB. In this case, both source and sink are available, for both the SQL and Table APIs and DataStream API. The new connector is now officially part of the Apache Flink project and supported by the community. This new connector replaces the old one provided by MongoDB directly, which only supports older Flink Sink and Source APIs.

As for other data store connectors, the source can either be used as a bounded source, in batch mode, or for lookups. The sink works both in batch mode and streaming, supporting both upsert and append mode.

Among the many notable features of this connector, one that’s worth mentioning is the ability to enable caching when using the source for lookups. Out of the box, the sink supports at-least-once guarantees. When a primary key is defined, the sink can support exactly-once semantics via idempotent upserts. The sink connector also supports exactly-once semantics, with idempotent upserts, when the primary key is defined.

New connector versioning

Not a new feature, but an important factor to consider when updating an older Apache Flink application, is the new connector versioning. Starting from Apache Flink version 1.17, most connectors have been externalized from the main Apache Flink distribution and follow independent versioning.

To include the right dependency, you need to specify the artifact version with the form: <connector-version>-<flink-version>

For example, the latest Kafka connector, also working with Amazon Managed Streaming for Apache Kafka (Amazon MSK), at the time of writing is version 3.1.0. If you are using Apache Flink 1.18, the dependency to use will be the following:

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

For Amazon Kinesis, the new connector version is 4.2.0. The dependency for Apache Flink 1.18 will be the following:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

In the following sections, we discuss more of the powerful new features now available in Apache Flink 1.18 and supported in Amazon Managed Service for Apache Flink.

SQL

In Apache Flink SQL, users can provide hints to join queries that can be used to suggest the optimizer to have an effect in the query plan. In particular, in streaming applications, lookup joins are used to enrich a table, representing streaming data, with data that is queried from an external system, typically a database. Since version 1.16, several improvements have been introduced for lookup joins, allowing you to adjust the behavior of the join and improve performance:

  • Lookup cache is a powerful feature, allowing you to cache in-memory the most frequently used records, reducing the pressure on the database. Previously, lookup cache was specific to some connectors. Since Apache Flink 1.16, this option has become available to all connectors internally supporting lookup (FLIP-221). As of this writing, JDBC, Hive, and HBase connectors support lookup cache. Lookup cache has three available modes: FULL, for a small dataset that can be held entirely in memory, PARTIAL, for a large dataset, only caching the most recent records, or NONE, to completely disable cache. For PARTIAL cache, you can also configure the number of rows to buffer and the time-to-live.
  • Async lookup is another feature that can greatly improve performance. Async lookup provides in Apache Flink SQL a functionality similar to Async I/O available in the DataStream API. It allows Apache Flink to emit new requests to the database without blocking the processing thread until responses to previous lookups have been received. Similarly to Async I/O, you can configure async lookup to enforce ordering or allow unordered results, or adjust the buffer capacity and the timeout.
  • You can also configure a lookup retry strategy in combination with PARTIAL or NONE lookup cache, to configure the behavior in case of a failed lookup in the external database.

All these behaviors can be controlled using a LOOKUP hint, like in the following example, where we show a lookup join using async lookup:

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

In this section, we discuss new improvements and support in PyFlink.

Python 3.10 support

Apache Flink newest versions introduced several improvements for PyFlink users. First and foremost, Python 3.10 is now supported, and Python 3.6 support has been completely removed (FLINK-29421). Managed Service for Apache Flink currently uses Python 3.10 runtime to run PyFlink applications.

Getting closer to feature parity

From the perspective of the programming API, PyFlink is getting closer to Java on every version. The DataStream API now supports features like side outputs and broadcast state, and gaps on windowing API have been closed. PyFlink also now supports new connectors like Amazon Kinesis Data Streams directly from the DataStream API.

Thread mode improvements

PyFlink is very efficient. The overhead of running Flink API operators in PyFlink is minimal compared to Java or Scala, because the runtime actually runs the operator implementation in the JVM directly, regardless of the language of your application. But when you have a user-defined function, things are slightly different. A line of Python code as simple as lambda x: x + 1, or as complex as a Pandas function, must run in a Python runtime.

By default, Apache Flink runs a Python runtime on each Task Manager, external to the JVM. Each record is serialized, handed to the Python runtime via inter-process communication, deserialized, and processed in the Python runtime. The result is then serialized and handed back to the JVM, where it’s deserialized. This is the PyFlink PROCESS mode. It’s very stable but it introduces an overhead, and in some cases, it may become a performance bottleneck.

Since version 1.15, Apache Flink also supports THREAD mode for PyFlink. In this mode, Python user-defined functions are run within the JVM itself, removing the serialization/deserialization and inter-process communication overhead. THREAD mode has some limitations; for example, THREAD mode cannot be used for Pandas or UDAFs (user-defined aggregate functions, consisting of many input records and one output record), but can substantially improve performance of a PyFlink application.

With version 1.16, the support of THREAD mode has been substantially extended, also covering the Python DataStream API.

THREAD mode is supported by Managed Service for Apache Flink, and can be enabled directly from your PyFlink application.

Apple Silicon support

If you use Apple Silicon-based machines to develop PyFlink applications, developing for PyFlink 1.15, you have probably encountered some of the known Python dependency issues on Apple Silicon. These issues have been finally resolved (FLINK-25188). These limitations did not affect PyFlink applications running on Managed Service for Apache Flink. Before version 1.16, if you wanted to develop a PyFlink application on a machine using M1, M2, or M3 chipset, you had to use some workarounds, because it was impossible to install PyFlink 1.15 or earlier directly on the machine.

Unaligned checkpoint improvements

Apache Flink 1.15 already supported Incremental Checkpoints and Buffer Debloating. These features can be used, particularly in combination, to improve checkpoint performance, making checkpointing duration more predictable, especially in the presence of backpressure. For more information about these features, see Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints.

With versions 1.16 and 1.17, several changes have been introduced to improve stability and performance.

Handling data skew

Apache Flink uses watermarks to support event-time semantics. Watermarks are special records, normally injected in the flow from the source operator, that mark the progress of event time for operators like event time windowing aggregations. A common technique is delaying watermarks from the latest observed event time, to allow events to be out of order, at least to some degree.

However, the use of watermarks comes with a challenge. When the application has multiple sources, for example it receives events from multiple partitions of a Kafka topic, watermarks are generated independently for each partition. Internally, each operator always waits for the same watermark on all input partitions, practically aligning it on the slowest partition. The drawback is that if one of the partitions is not receiving data, watermarks don’t progress, increasing the end-to-end latency. For this reason, an optional idleness timeout has been introduced in many streaming sources. After the configured timeout, watermark generation ignores any partition not receiving any record, and watermarks can progress.

You can also face a similar but opposite challenge if one source is receiving events much faster than the others. Watermarks are aligned to the slowest partition, meaning that any windowing aggregation will wait for the watermark. Records from the fast source have to wait, being buffered. This may result in buffering an excessive volume of data, and an uncontrollable growth of operator state.

To address the issue of faster sources, starting with Apache Flink 1.17, you can enable watermark alignment of source splits (FLINK-28853). This mechanism, disabled by default, makes sure that no partitions progress their watermarks too fast, compared to other partitions. You can bind together multiple sources, like multiple input topics, assigning the same alignment group ID, and configuring the duration of the maximal drift from the current watermark. If one specific partition is receiving events too fast, the source operator pauses consuming that partition until the drift is reduced below the configured threshold.

You can enable it for each source separately. All you need is to specify an alignment group ID, which will bind together all sources that have the same ID, and the duration of the maximal drift from the current minimal watermark. This will pause consuming from the source subtask that are advancing too fast, until the drift is lower than the threshold specified.

The following code snippet shows how you can set up watermark alignment of source splits on a Kafka source emitting bounded-out-of-orderness watermarks:

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

This feature is only available with FLIP-217 compatible sources, supporting watermark alignment of source splits. As of writing, among major streaming source connectors, only Kafka source supports this feature.

Direct support for Protobuf format

The SQL and Table APIs now directly support Protobuf format. To use this format, you need to generate the Protobuf Java classes from the .proto schema definition files and include them as dependencies in your application.

The Protobuf format only works with the SQL and Table APIs and only to read or write Protobuf-serialized data from a source or to a sink. Currently, Flink doesn’t directly support Protobuf to serialize state directly and it doesn’t support schema evolution, as it does for Avro, for example. You still need to register a custom serializer with some overhead for your application.

Keeping Apache Flink open source

Apache Flink internally relies on Akka for sending data between subtasks. In 2022, Lightbend, the company behind Akka, announced a license change for future Akka versions, from Apache 2.0 to a more restrictive license, and that Akka 2.6, the version used by Apache Flink, would not receive any further security update or fix.

Although Akka has been historically very stable and doesn’t require frequent updates, this license change represented a risk for the Apache Flink project. The decision of the Apache Flink community was to replace Akka with a fork of the version 2.6, called Apache Pekko (FLINK-32468). This fork will retain the Apache 2.0 license and receive any required updates by the community. In the meantime, the Apache Flink community will consider whether to remove the dependency on Akka or Pekko completely.

State compression

Apache Flink offers optional compression (default: off) for all checkpoints and savepoints. Apache Flink identified a bug in Flink 1.18.1 where the operator state couldn’t be properly restored when snapshot compression is enabled. This could result in either data loss or inability to restore from checkpoint. To resolve this, Managed Service for Apache Flink has backported the fix that will be included in future versions of Apache Flink.

In-place version upgrades with Managed Service for Apache Flink

If you are currently running an application on Managed Service for Apache Flink using Apache Flink 1.15 or older, you can now upgrade it in-place to 1.18 without losing the state, using the AWS Command Line Interface (AWS CLI), AWS CloudFormation or AWS Cloud Development Kit (AWS CDK), or any tool that uses the AWS API.

The UpdateApplication API action now supports updating the Apache Flink runtime version of an existing Managed Service for Apache Flink application. You can use UpdateApplication directly on a running application.

Before proceeding with the in-place update, you need to verify and update the dependencies included in your application, making sure they are compatible with the new Apache Flink version. In particular, you need to update any Apache Flink library, connectors, and possibly Scala version.

Also, we recommend testing the updated application before proceeding with the update. We recommend testing locally and in a non-production environment, using the target Apache Flink runtime version, to ensure no regressions were introduced.

And finally, if your application is stateful, we recommend taking a snapshot of the running application state. This will enable you to roll back to the previous application version.

When you’re ready, you can now use the UpdateApplication API action or update-application AWS CLI command to update the runtime version of the application and point it to the new application artifact, JAR, or zip file, with the updated dependencies.

For more detailed information about the process and the API, refer to In-place version upgrade for Apache Flink. The documentation includes a step by step instructions and a video to guide you through the upgrade process.

Conclusions

In this post, we examined some of the new features of Apache Flink, supported in Amazon Managed Service for Apache Flink. This list is not comprehensive. Apache Flink also introduced some very promising features, like operator-level TTL for the SQL and Table API [FLIP-292] and Time Travel [FLIP-308], but these are not yet supported by the API, and not really accessible to users yet. For this reason, we decided not to cover them in this post.

With the support of Apache Flink 1.18, Managed Service for Apache Flink now supports the latest released Apache Flink version. We have seen some of the interesting new features and new connectors available with Apache Flink 1.18 and how Managed Service for Apache Flink helps you upgrade an existing application in place.

You can find more details about recent releases from the Apache Flink blog and release notes:

If you are new to Apache Flink, we recommend our guide to choosing the right API and language and following the getting started guide to start using Managed Service for Apache Flink.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers, helping them design real-time analytics architectures using AWS services, supporting Amazon MSK and Amazon Managed Service for Apache Flink.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 2

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints-part-2/

This post is a continuation of a two-part series. In the first part, we delved into Apache Flink‘s internal mechanisms for checkpointing, in-flight data buffering, and handling backpressure. We covered these concepts in order to understand how buffer debloating and unaligned checkpoints allow us to enhance performance for specific conditions in Apache Flink applications.

In Part 1, we introduced and examined how to use buffer debloating to improve in-flight data processing. In this post, we focus on unaligned checkpoints. This feature has been available since Apache Flink 1.11 and has received many improvements since then. Unaligned checkpoints help, under specific conditions, to reduce checkpointing time for applications suffering temporary backpressure, and can be now enabled in Amazon Managed Service for Apache Flink applications running Apache Flink 1.15.2 through a support ticket.

Even though this feature might improve performance for your checkpoints, if your application is constantly failing because of checkpoints timing out, or is suffering from having constant backpressure, you may require a deeper analysis and redesign of your application.

Aligned checkpoints

As discussed in Part 1, Apache Flink checkpointing allows applications to record state in case of failure. We’ve already discussed how checkpoints, when triggered by the job manager, signal all source operators to snapshot their state, which is then broadcasted as a special record called a checkpoint barrier. This process achieves exactly-once consistency for state in a distributed streaming application through the alignment of these barriers.

Let’s walk through the process of aligned checkpoints in a standard Apache Flink application. Remember that Apache Flink distributes the workload horizontally: each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks based on its parallelism.

Barrier alignment

The alignment of checkpoint barriers is crucial for achieving exactly-once consistency in Apache Flink applications during checkpoint runs. To recap, when a job manager triggers a checkpoint, all sub-tasks of source operators receive a signal to initiate the checkpoint process. Each sub-task independently snapshots its state to the state backend and broadcasts a special record known as a checkpoint barrier to all outgoing streams.

When an application operates with a parallelism higher than 1, multiple instances of each task—referred to as sub-tasks—enable parallel message consumption and processing. A sub-task can receive distinct partitions of the same stream from different upstream sub-tasks, such as after a stream repartitioning with keyBy or rebalance operations. To maintain exactly-once consistency, all sub-tasks must wait for the arrival of all checkpoint barriers before taking a snapshot of the state. The following diagram illustrates the checkpoint barriers flow.

Checkpoint Barriers flow in the Buffer Queues

This phase is called checkpoint alignment. During alignment, the sub-task stops processing records from the partitions from which it has already received barriers, as shown in the following figure.

The first Barrier reaches the sub-task: Checkpointing Alignment begins

However, it continues to process partitions that are behind the barrier.

Processing continues only for partitions behind the barrier

When barriers from all upstream partitions have arrived, the sub-task takes a snapshot of its state.

Barrier alignment complete: snapshot state

Then it broadcasts the barrier downstream.

Emit Barriers downstream, and continue processing

The time a sub-task spends waiting for all barriers to arrive is measured by the checkpoint Alignment Duration metric, which can be observed in the Apache Flink UI.

If the application experiences backpressure, an increase in this metric could lead to longer checkpoint durations and even checkpoint failures due to timeouts. This is where unaligned checkpoints become a viable option to potentially enhance checkpointing performance.

Unaligned checkpoints

Unaligned checkpoints address situations where backpressure is not just a temporary spike, but results in timeouts for aligned checkpoints, due to barrier queuing within the stream. As discussed in Part 1, checkpoint barriers can’t overtake regular records. Therefore, significant backpressure can slow down the movement of barriers across the application, potentially causing checkpoint timeouts.

The objective of unaligned checkpoints is to enable barrier overtaking, allowing barriers to move swiftly from source to sink even when the data flow is slower than anticipated.

Building on what we saw in Part 1 concerning checkpoints and what aligned checkpoints are, let’s explore how unaligned checkpoints modify the checkpointing mechanism.

Upon emission, each source’s checkpoint barrier is injected into the stream flowing across sub-tasks. It travels from the source output network buffer queue into the input network buffer queue of the subsequent operator.

Upon the arrival of the first barrier in the input network buffer queue, the operator initially waits for barrier alignment. If the specified alignment timeout expires because not all barriers have reached the end of the input network buffer queue, the operator switches to unaligned checkpoint mode.

The alignment timeout can be set programmatically by env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30)), but modifying the default is not recommended in Apache Flink 1.15.

Checkpoint barriers flow in the buffer queues

The operator waits until all checkpoint barriers are present in the input network buffer queue before triggering the checkpoint. Unlike aligned checkpoints, the operator doesn’t need to wait for all barriers to reach the queue’s end, allowing the operator to have in-flight data from the buffer that hasn’t been processed before checkpoint initiation.

All barriers are in the input queues

After all barriers have arrived in the input network buffer queue, the operator advances the barrier to the end of the output network buffer queue. This enhances checkpointing speed because the barrier can smoothly traverse the application from source to sink, independent of the application’s end-to-end latency.

Barriers can overtake in-flight messages

After forwarding the barrier to the output network buffer queue, the operator initiates the snapshot of in-flight data between the barriers in the input and output network buffer queues, along with the snapshot of the state.

Although processing is momentarily paused during this process, the actual writing to the remote persistent state storage occurs asynchronously, preventing potential bottlenecks.

Snapshot state and in-flight messages

The local snapshot, encompassing in-flight messages and state, is saved asynchronously in the remote persistent state store, while the barrier continues its journey through the application.

Processing continues

When to use unaligned checkpoints

Remember, barrier alignment only occurs between partitions coming from different sub-tasks of the same operator. Therefore, if an operator is experiencing temporary backpressure, enabling unaligned checkpoints may be beneficial. This way, the application doesn’t have to wait for all barriers to reach the operator before performing the snapshot of state or moving the barrier forward.

Temporary backpressure could arise from the following:

  • A surge in data ingestion
  • Backfilling or catching up with historical data
  • Increased message processing time due to delayed external systems

Another scenario where unaligned checkpoints prove advantageous is when working with exactly-once sinks. Utilizing the two-phase commit sink function for exactly-once sinks, unaligned checkpoints can expedite checkpoint runs, thereby reducing end-to-end latency.

When not to use unaligned checkpoints

Unaligned checkpoints won’t reduce the time required for savepoints (called snapshots in the Amazon Managed Service for Apache Flink implementation) because savepoints exclusively utilize aligned checkpoints. Furthermore, because Apache Flink doesn’t permit concurrent unaligned checkpoints, savepoints won’t occur simultaneously with unaligned checkpoints, potentially elongating savepoint durations.

Unaligned checkpoints won’t fix any underlying issue in your application design. If your application is suffering from persistent backpressure or constant checkpointing timeouts, this might indicate data skewness or underprovisioning, which may require improving and tuning the application.

Using unaligned checkpoints with buffer debloating

One alternative for reducing the risks associated with an increased state size is to combine unaligned checkpoints with buffer debloating. This approach results in having less in-flight data to snapshot and store in the state, along with less data to be used for recovery in case of failure. This synergy facilitates enhanced performance and efficient checkpoint runs, leading to smaller checkpointing sizes and faster recovery times. When testing the use of unaligned checkpoints, we recommend doing so with buffer debloating to prevent the state size from increasing.

Limitations

Unaligned checkpoints are subject to the following limitations:

  • They provide no benefit for operators with a parallelism of 1.
  • They only improve performance for operators where barrier alignment would have occurred. This alignment happens only if records are coming from different sub-tasks of the same operator, for example, through repartitioning or keyBy operations.
  • Operators receiving input from multiple sources or participating in joins might not experience improvements, because the operator would be receiving data from different operators in those cases.
  • Although checkpoint barriers can surpass records in the network’s buffer queue, this won’t occur if the sub-task is currently processing a message. If processing a message takes too much time (for example, a flat-map operation emitting numerous records for each input record), barrier handling will be delayed.
  • As we have seen, savepoints always use aligned checkpoints. If the savepoints of your applications are slow due to barrier alignment, unaligned checkpoints will not help.
  • Additional limitations affect watermarks, message ordering, and broadcast state in recovery. For more details, refer to Limitations.

Considerations

Considerations for implementing unaligned checkpoints:

  • Unaligned checkpoints introduce additional I/O to checkpoint storage
  • Checkpoints encompass not only operator state but also in-flight data within network buffer queues, leading to increased state size

Recommendations

We offer the following recommendations:

  • Consider enabling unaligned checkpoints only if both of the following conditions are true:
  • Checkpoints are timing out.
  • The average checkpoint Async Duration of any operator is more than 50% of the total checkpoint duration for the operator (sum of Sync Duration + Async Duration).
  • Consider enabling buffer debloating first, and evaluate whether it solves the problem of checkpoints timing out.
  • If buffer debloating doesn’t help, consider enabling unaligned checkpoints along with buffer debloating. Buffer debloating mitigates the drawbacks of unaligned checkpoints, reducing the amount of in-flight data.
  • If unaligned checkpoints and buffer debloating together don’t improve checkpoint alignment duration, consider testing unaligned checkpoints alone.

Decision flow

Finally, but most importantly, always test unaligned checkpoints in a non-production environment first, running some comparative performance testing with a realistic workload, and verify that unaligned checkpoints actually reduce checkpoint duration.

Conclusion

This two-part series explored advanced strategies for optimizing checkpointing within your Amazon Managed Service for Apache Flink applications. By harnessing the potential of buffer debloating and unaligned checkpoints, you can unlock significant performance improvements and streamline checkpoint processes. However, it’s important to understand when these techniques will provide improvements and when they will not. If you believe your application may benefit from checkpoint performance improvement, you can enable these features in your Amazon Managed Service For Apache Flink version 1.15 applications. We recommend first enabling buffer debloating and testing the application. If you are still not seeing the expected outcome, enable buffer debloating with unaligned checkpoints. This way, you can immediately reduce the state size and the additional I/O to state backends. Lastly, you may try using unaligned checkpoints by itself, bearing in mind the considerations we’ve mentioned.

With a deeper understanding of these techniques and their applicability, you are better equipped to maximize the efficiency of checkpoints and mitigate the effect of backpressure in your Apache Flink application.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 1

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/part-1-optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints/

This post is the first of a two-part series regarding checkpointing mechanisms and in-flight data buffering. In this first part, we explain some of the fundamental Apache Flink internals and cover the buffer debloating feature. In the second part, we focus on unaligned checkpoints.

Apache Flink is an open-source distributed engine for stateful processing over unbounded datasets (streams) and bounded datasets (batches). Amazon Managed Service for Apache Flink, formerly known as Amazon Kinesis Data Analytics, is the AWS service offering fully managed Apache Flink.

Apache Flink is designed for stateful processing at scale, for high throughput and low latency. It scales horizontally, distributing processing and state across multiple nodes, and is designed to withstand failures without compromising the exactly-once consistency it provides.

Internally, Apache Flink uses clever mechanisms to maintain exactly-once state consistency, while also optimizing for throughput and reduced latency. The default behavior works well for most use cases. Recent versions introduced two functionalities that can be optionally enabled to improve application performance under particular conditions: buffer debloating and unaligned checkpoints.

Buffer debloating and unaligned checkpoints can be enabled on Amazon Managed Service for Apache Flink version 1.15.

To understand how these functionalities can help and when to use them, we need to dive deep into some of the fundamental internal mechanisms of Apache Flink: checkpointing, in-flight data buffering, and backpressure.

Maintaining state consistency through failures with checkpointing

Apache Flink checkpointing periodically saves the internal application state for recovering in case of failure. Each of the distributed components of an application asynchronously snapshots its state to an external persistent datastore. The challenge is taking snapshots guaranteeing exactly-once consistency. A naïve “stop-the-world, take a snapshot” implementation would never meet the high throughput and low latency goals Apache Flink has been designed for.

Let’s walk through the process of checkpointing in a simple streaming application.

As shown in the following figure, Apache Flink distributes the work horizontally. Each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks, based on its parallelism. The application is coordinated by a job manager. Checkpoints are periodically initiated by the job manager, sending a signal to all source operators’ sub-tasks.

Checkpoint initiated by the Job Manager

On receiving the signal, each source sub-task independently snapshots its state (for example, the offsets of the Kafka topic it is consuming) to a persistent storage, and then broadcasts a special record called checkpoint barrier (“CB” in the following diagrams) to all outgoing streams. Checkpoint barriers work similarly to watermarks in Apache Flink, flowing in-bands, along with normal records. A barrier does not overtake normal records and is not overtaken.

Source operators emit checkpoint bariers

When a downstream operator’s sub-task receives all checkpoint barriers from all input channels, it starts snapshotting its state.

A sub-task does not pause processing while saving its state to the remote, persistent state backend. This is a two-phase operation. First, the sub-task takes a snapshot of the state, on the local file system or in memory, depending on application configuration. This operation is blocking but very fast. When the snapshot is complete, it restarts processing records, while the state is asynchronously saved to the external, persistent state store. When the state is successfully saved to the state store, the sub-task acknowledges to the job manager that its checkpointing is complete.

The time a sub-task spends on the synchronous and asynchronous parts of the checkpoint is measured by Sync Duration and Async Duration metrics, shown by the Apache Flink UI. It is then asynchronously sent to the backend. After the fast snapshot, the sub-task restarts processing messages. The backend notifies the sub-task when the state has been successfully saved. The sub-task, in turn, sends an acknowledgment to the job manager that checkpointing is complete.

Sub-tasks acknowledge checkpoint completion

Checkpoint barriers propagate through all operators, down to the sinks. When all sink sub-tasks have acknowledged the checkpoint to the job manager, the checkpoint is declared complete and can be used to recover the application, for example in case of failure.

Sink operators acknowledge checkpoint is complete

Checkpoint barrier alignment

A sub-task may receive different partitions of the same stream from different upstream sub-tasks, for example when a stream is repartitioned with a keyBy or a rebalance. Each upstream sub-task will emit a checkpoint barrier independently. To maintain exactly-once consistency, the sub-task must wait for the barriers to arrive on all input partitions before taking a snapshot of its state.

This phase is called checkpoint alignment. During the alignment, the sub-task stops processing records from the partitions it already received the barrier from and continues processing the partitions that are behind the barrier.

After the barriers from all upstream partitions have arrived, the sub-task takes the snapshot of its state and then broadcasts the barrier downstream.

The time spent by a sub-task while aligning barriers is measured by the Checkpoint Alignment Duration metric, shown by the Apache Flink UI.

Checkpoint barrier alignment

In-flight data buffering

To optimize for throughput, Apache Flink tries to keep each sub-task always busy. This is achieved by transmitting records over the network in blocks and by buffering in-flight data. Note that this is data transmission optimization; Flink operators always process records one at the time.

Data is handed over between sub-tasks in units called network buffers. A network buffer has a fixed size, in bytes.

Sub-tasks also buffer in-flight input and output data. These buffers are called network buffer queues. Each queue is composed of multiple network buffers. Each sub-task has an input network buffer queue for each upstream sub-task and an output network buffer queue for each downstream sub-task.

Each record emitted by the sub-task is serialized, put into network buffers, and published to the output network buffer queue. To use all the available space, multiple messages can be packed into a single network buffer or split across subsequent network buffers.

A separate thread sends full network buffers over the network, where they are stored in the destination sub-task’s input network buffer queue.

When the destination sub-task thread is free, it deserializes the network buffers, rebuilds the records, and processes them one at a time.

Network Buffer Queue

Backpressure

If a sub-task can’t keep up with processing records at the same pace they are received, the input queue fills up. When the input queue is full, the upstream sub-task stops sending data.

Data accumulates in the sender’s output queue. When this is also full, the sender sub-task stops processing records, accumulating received data in its own input queue, and the effects propagates upstream.

This is the backpressure that Apache Flink uses to control the internal flow, preventing slow operators from being overwhelmed by slowing down the upstream flow. Backpressure is a safety mechanism to maximize the application throughput. It can be temporary, in case of an unexpected peak of ingested data, for example. If not temporary, it is usually the symptom—not the cause—that the application is not designed correctly or it has insufficient resources to process the workload.

Full Network Buffer Queue generates backpressure

In-flight buffering and checkpoint barriers

As checkpoint barriers flow with normal records, they also flow in the network buffers, through the input and output queues. In normal conditions, barriers don’t overtake records, and they are never overtaken. If records are queueing up due to backpressure, checkpoint barriers are also stuck in the queue, taking longer time to propagate from the sources to the sinks, delaying the completion of the checkpoint.

In the second part of this series, we will see how unaligned checkpoints can let barriers overtake records under specific conditions. For now, let’s see how we can optimize the size of input and output queues with buffer debloating.

Buffer debloating to optimize in-flight data

The default network buffer queue size is a good compromise for most applications. You can modify this size, but it applies to all sub-tasks, and it may be difficult to optimize this one-size-fits-all across different operators.

Longer queues support bigger throughout, but they may slow down checkpoint barriers that have to go through longer queues, causing longer End to End Checkpoint Duration. Ideally, the amount of in-flight data should be adjusted based on the actual throughput.

In version 1.14, Apache Flink introduced buffer debloating, which can be enabled to adjust in-flight data of each sub-task, based on the current throughput the sub-task is processing, and periodically reassess and readjust it.

How buffer debloating helps your application

Consider a streaming application, ingesting records from a streaming source and publishing the results to a streaming destination after some transformations. Under normal conditions, the application is sized to process the incoming throughput smoothly. Our destination has limited capacity, for example a Kafka topic throttled via quotas, sufficient to handle the normal throughput, with some margin.

In-flight data buffering under normal throughput

Imagine that the ingestion throughput has occasional peaks. These peaks exceed the limits of the streaming destination (throughput quota of the Kafka topic), which starts throttling.

Full in-flight data buffer to the sink backpressure the preceding operator

Because the sink can’t process the full throughput, in-flight data accumulates upstream of the sink, causing backpressure on the upstream operator. The effect eventually propagates up to the source, and the source starts lagging behind the most recent record in the source stream.

Backpressure propagates upstream, up to the source operator

As long this is a temporary condition, backpressure and lagging are not a problem per se, as long as the application is able to catch up when the peak has finished.

Unfortunately, accumulating in-flight data also slows down the propagation of the checkpoint barriers. Checkpoint End to End Duration goes up, and checkpoints may eventually time out.

Full in-flight data buffers slow down checkpoint barrier propagation, under backpressure

The situation is even worse if the sink uses two-phase commit for exactly-once guarantees. For example, KafkaSink uses Kafka transactions committed on checkpoints. If checkpoints become too slow, transactions are committed later, significantly increasing the latency of any downstream consumer using a read-committed isolation level.

Slow checkpoints under backpressure may also cause a vicious cycle. A slowed-down application eventually crashes, and recovers from the last checkpoint that is quite old. This causes a long reprocessing that, in turn, induces more backpressure and even slower checkpoints.

In this case, buffer debloating can help by adjusting the amount of in-flight data based on the throughput each sub-task is actually processing. When a sub-task is throttled by backpressure, the amount of in-flight data is reduced, also reducing the time checkpoint barriers take to go through all operators. Checkpoint End to End Duration goes down, and checkpoints do not time out.

Buffer debloating internals

Buffer debloating estimates the throughput a sub-task is capable of processing, assuming no idling, and limits the upstream in-flight data buffers to contain just enough data to be processed in 1 second (by default).

For efficiency, network buffers in the queues are fixed. Buffer debloating caps the usable size of each network buffer, making it smaller when the sub-task is processing slowly.

Buffer debloating speed up barrier propagation, reducing the volume of in-flight data

The benefits of less in-flight data depends on whether Apache Flink is using standard checkpoint alignment, the default behavior described so far, or unaligned checkpoints. We will examine unaligned checkpoints in the second part of this series, but let’s see the effect of buffer debloating, briefly.

  • With aligned checkpoints (default behavior) – Less in-flight data makes checkpoint barrier propagation faster, ultimately reducing the end-to-end checkpoint duration but also making it more predictable
  • With unaligned checkpoints (optional) – Less in-flight data reduces the amount of in-flight records stored with the checkpoint, ultimately reducing the checkpoint size

What buffer debloating does not do

Note that the problem we are trying to solve is slow checkpointing (or excessive checkpointing size, with unaligned checkpoints). Buffer debloating helps making checkpointing faster.

Buffer debloating does not remove backpressure. Backpressure is the internal protective mechanism that Apache Flink uses when some part of the application is not able to cope with the incoming throughput. To reduce backpressure, you have to work on other aspects of the application. When backpressure is only temporary, for example under peak conditions, the only way of removing it would be sizing the end-to-end system for the peak, rather than normal workload. But this could be impossible or too expensive.

Buffer debloating helps reduce and keep checkpoint duration stable under exceptional and temporary conditions. If an application experiences backpressure under its normal workload, or checkpoints are too slow under normal conditions, you should investigate the implementation of your application to understand the root cause.

When the automatic throughput prediction fails

Buffer debloating doesn’t have any particular drawback, but in corner cases, the mechanism may incorrectly estimate the throughput, and the resulting amount of in-flight data may not be optimal.

Estimating the throughput is complex when an operator receives data from multiple upstream operators, connected streams or unions, with very different throughput. It may also take time to adjust to a sudden spike, causing a temporary suboptimal buffering.

  • Too small in-flight data may reduce the throughput the sub-task can process (it will be idling), causing more backpressure upstream
  • Too large buffers may slow down checkpointing and increase the checkpoint size (with unaligned checkpoints)

Conclusion

The checkpointing mechanism makes Apache Flink fault tolerant, providing exactly-once state consistency. In-flight data buffering and backpressure control the data flow within the distributed streaming application maximize the throughput. Apache Flink default behaviors and configurations are good for most workloads.

The effectiveness of buffer debloating depends on the characteristics of the workload and the application. The general recommendation is to test the functionality in a non-production environment with a realistic workload to verify it actually helps with your use case.

You can request to enable buffer debloating on your Amazon Managed Service for Apache Flink application.

Under particular conditions, the combined effect of backpressure and in-flight data buffering may slow down checkpointing, increase checkpointing size (with unaligned checkpoints), and even cause checkpoints to fail. In these cases, enabling unaligned checkpointing may help reduce checkpoint duration or size.

In the second part of this series, we will understand better unaligned checkpoints and how they can help your application checkpointing efficiently in presence of backpressure, especially in combination with buffer debloating.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.