Search code examples
apache-kafkagoogle-cloud-dataflowapache-beamapache-beam-kafkaio

How to manually commit kafka offset after FileIO in apache beam?


I have a FileIO writing a Pcollection<GenericRecord> to files and returns WriteFilesResult<DestinationT>.

I would like to create a DoFn after writing files to commit the offset of written records to kafka but since my offsets are stored in my GenericRecords I can no longer access them in the output of FileIO.

What is the best way to solve this ?


Solution

  • For anyone interested, here is how I did:

    • manually groupbykey records by DestinationT
    • for each group I get the list of offsets and I create a new key EnrichedDestinationT + flatten the iterable
    • so the Pcollection before entering FileIO is PCollection<KV<EnrichedDestinationT, GenericRecord>>
    • in FileIO, the .by() becomes .by(KV::getKey) and the .via() becomes .via(Contextful.fn(KV::getValue), Contextful.fn(this::getSink))