I am trying to read a kafka topic as a datastream in Flink. I am using FlinkKafkaConsumer
to read the topic.
The problem that I am facing is that after a few testing with I want to start reading again from the start of the topic to do some extra bit of testing. Ideally changing the group.id
and restarting the job both should be enough to accomplish this.
But after restart, the consumer is still able to find the old checkpoints/kafka.commit. I also tried to delete all the checkpoints delete all configMaps and deployments and restart everything but the same thing happened again. I can see the offsets being set in taskmanager logs.
How to read from the start of the topic again?
2021-02-17 10:08:41,287 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Discovered group coordinator idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 2147483647 rack: null)
2021-02-17 10:08:41,324 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-0 to the committed offset FetchPosition{offset=40204, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,326 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-1 to the committed offset FetchPosition{offset=39962, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-4 to the committed offset FetchPosition{offset=40444, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-2 to the committed offset FetchPosition{offset=40423, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-3 to the committed offset FetchPosition{offset=40368, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
I don't think the problem is that consumer is able to find old commits or old checkpoints as long as You are starting the job from scratch not from savepoint.
The issue seems to be that You don't set the auto.offset.reset
on Kafka Consumer, which means that default value is used, which is latest
. So, whenever You start a job with new group.id
it will always start from the latest offsets committed to Kafka. You can change that by simply passing auto.offset.reset
property set to earliest
on properties passed to KafkaConsumer.