Search code examples
apache-kafkaapache-beamkafka-consumer-api

How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution


I create a simple Apache Beam streaming pipeline which read data from Kafka, do some processing and persist the result by calling some external service's API. I want to make sure no data are lost during pipeline restart or failure so I want to manually commit the record offset to Kafka after I successfully call the API at the end of specific doFun execution.

In my previous Kafka experience, I know that by using Kafka Consumer's below API, I am able to manually commit the record offset to Kafka.

consumer.commitSync(currentOffsets); 

There is setting to turn off the auto commit in KafkaIO setup, however I didn't find any working solution to manually commit offset in Apache Beam as there seems no way I can access the consumer in doFun. Appreciated if some expert can share some hint with sample codes.


Solution

  • By default, pipeline.apply(KafkaIO.read()...) will return a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can get an offset from KafkaRecord metadata and commit it manually in a way that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).

    Though, you need to make sure that a call to external API and offset commit will be atomic to prevent potential data loss (if it's critical).