Search code examples
apache-flink

What's the behavior if I have set the auto commit to true and


I am reading at https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-offset-committing-behaviour-configuration

It says:

Checkpointing enabled: if checkpointing is enabled, the Flink Kafka Consumer will commit the offsets stored in the checkpointed states when the checkpoints are completed. This ensures that the committed offsets in Kafka brokers is consistent with the offsets in the checkpointed states. Users can choose to disable or enable offset committing by calling the setCommitOffsetsOnCheckpoints(boolean) method on the consumer (by default, the behaviour is true). Note that in this scenario, the automatic periodic offset committing settings in Properties is completely ignored.

If I enable the checkpoint with 10 seconds interval, also I have setCommitOffsetsOnCheckpoints to true, and also set the enable.auto.commit=true and auto.commit.interval.ms=5000 in the Kafka consumer properties.

Then what would the offset committing behave? will the offset be committed for 3 times every 10 seconds? One time from the flink when doing checkpoint and two times from the Kafka consumer's auto commits ?


Solution

  • enable.auto.commit:Automatic offset submission, the configuration of this value is not the final offset submission mode, you need to consider whether the user has enabled checkpoint, Will be interpreted in the following source code analysis

    consumer.setCommitOffsetsOnCheckpoints(true) ​ Explanation: After setting the checkpoint, submit the offset, that is, the oncheckpoint mode. The value is true by default. This parameter will affect the submission method of the offset. The following source code will analyze it

    So according to your stated quote and looking further in the internet, Flink consumer will ignore auto.commit configuration when the checkpoint is enabled, the offset committing behavior will be defined by consumer.setCommitOffsetsOnCheckpoints defined (it is true by default)

    https://www.programmersought.com/article/84744879064/

    From Flink document

    Checkpointing disabled: if checkpointing is disabled, the Flink Kafka Consumer relies on the automatic periodic offset committing capability of the internally used Kafka clients. Therefore, to disable or enable offset committing, simply set the enable.auto.commit / auto.commit.interval.ms keys to appropriate values in the provided Properties configuration.

    Checkpointing enabled: if checkpointing is enabled, the Flink Kafka Consumer will commit the offsets stored in the checkpointed states when the checkpoints are completed. This ensures that the committed offsets in Kafka brokers is consistent with the offsets in the checkpointed states. Users can choose to disable or enable offset committing by calling the setCommitOffsetsOnCheckpoints(boolean) method on the consumer (by default, the behaviour is true). Note that in this scenario, the automatic periodic offset committing settings in Properties is completely ignored.

    Note that in this scenario, the automatic periodic offset committing settings in Properties is completely ignored