The below is just random example code:
import argparse
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
class FormatDoFn(beam.DoFn):
def process(self, viewing, window=beam.DoFn.WindowParam):
print(viewing)
def main(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read from PubSub messages.
input = p | beam.io.ReadFromPubSub("projects/example-project/topics/example")
transformed = (
input
| beam.WindowInto(
window.FixedWindows(5),
trigger = AfterWatermark(),
accumulation_mode = AccumulationMode.DISCARDING
)
| 'Format' >> beam.ParDo(FormatDoFn())
)
if __name__ == '__main__':
main()
I have a working Dataflow job that processes Pub/Sub messages per fixed window interval and inserts the aggregated results to a table. However, it only makes inserts when messages are received.
How can I make the dataflow job insert a Zero Value row once a window has passed without any messages received.
The solution turned out to be very simple for my use case. I just generated dummy KVs and combined it with the regular Pubsub message transformed KVs.
PCollection<Long> input = pipeline
.apply("unbounded longs",
GenerateSequence
.from(0)
.withRate(2, Duration.standardSeconds(1))
);