Search code examples
apache-kafkaapache-flinkconsumer

Flink Kafka GroupId seems to be ignored when using KafkaSource


I'm new with Apache Flink. I've tried to get Events from Apache Kafka using the KafkaSource from Flink. So far so good, it seems to work fine. After restarting the flink task, I've got the same messages again, though I've set the GroupId.

Here is my Code-Snippet to read from the Kafka:

KafkaSource<BestellungEvent> bestellungEventSource = KafkaSource.<BestellungEvent>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("bestellungen")
                .setGroupId("bla2")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new BestellungEventKeyValueDeserializationSchema()))
                .build(); 

Can anybody help me with this problem?

Best Regards

Thomas


Solution

  • You are using .setStartingOffsets(OffsetsInitializer.earliest()), which starts from the earliest offset. You should use .setStartingOffsets(OffsetsInitializer.committedOffsets()) to start consumption from group's last offset. You can read more about various options in documentation (I am not sure if I link to correct version of docs, as you haven't mentioned which version of Flink Kafka Connector you are using.)

    If this answer does not help, make sure that your consumer commits offsets after consuming message from Kafka.