Search code examples
pythongoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

How to handle lack of messages from Pub/Sub in Google Dataflow


The below is just random example code:

import argparse
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode

class FormatDoFn(beam.DoFn):
    def process(self, viewing, window=beam.DoFn.WindowParam):
        print(viewing)

def main(argv=None):

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    with beam.Pipeline(argv=pipeline_args) as p:

        # Read from PubSub messages.
        input = p | beam.io.ReadFromPubSub("projects/example-project/topics/example")

        transformed = (
            input
            | beam.WindowInto(
                                window.FixedWindows(5),
                                trigger = AfterWatermark(),
                                accumulation_mode = AccumulationMode.DISCARDING
                            )

            | 'Format' >> beam.ParDo(FormatDoFn())
        )

if __name__ == '__main__':
    main()

I have a working Dataflow job that processes Pub/Sub messages per fixed window interval and inserts the aggregated results to a table. However, it only makes inserts when messages are received.

How can I make the dataflow job insert a Zero Value row once a window has passed without any messages received.


Solution

  • The solution turned out to be very simple for my use case. I just generated dummy KVs and combined it with the regular Pubsub message transformed KVs.

    PCollection<Long> input = pipeline
                                .apply("unbounded longs",
                                        GenerateSequence
                                                .from(0)
                                                .withRate(2, Duration.standardSeconds(1))
                                );