Search code examples
apache-kafkaapache-flinkflink-streaming

Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties


In my flink application I have kafka datasource. I am using kafka property auto.offset.reset=latest. I am wondering if I need to use FlinkKafkaConsumer.setStartFromLatest(). Are they similar? Can I use either of them? Following is the documentation from flink code. But it is not clear how this method is related to the kafka property.

/**
 * Specifies the consumer to start reading from the latest offset for all partitions. This lets
 * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 *
 * <p>This method does not affect where partitions are read from when the consumer is restored
 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
 * only the offsets in the restored state will be used.
 *
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
    this.startupMode = StartupMode.LATEST;
    this.startupOffsetsTimestamp = null;
    this.specificStartupOffsets = null;
    return this;
}

Solution

  • No need to put auto.offset.reset=latest in the property map if setStartFromLatest() is called.

    Internally, Flink uses Kafka consumer client's assign method to manage partition assignment to Flinks tasks. It uses the value of startupMode to initialise Kafka consumer. startupMode is set through setStartFrom... methods, and default value is GROUP_OFFSETS.

    Have to put consumer group id into property if using FlinkKafkaConsumer. Another choice is to use KafkaSouce.builder()(sample code), which provides functions to set those things up.