Search code examples
pythonstreaminggoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

Combiner Functions Seemingly Not emitting correct results


So I'm working on a test streaming case. Reading from pubsub and for now, sending to stdout for some visuals on the pipeline and transforms.

I believe I'm getting some unusual output, and believe I'm likely missing something so hoping someone can help.

Take my code (stripped back to debug):

with beam.Pipeline(options=opts)as p:
    (
    p
        | ReadFromPubSub(topic=topic_name
                        ,timestamp_attribute='timestamp')
        | beam.WindowInto(beam.window.FixedWindows(beam.window.Duration(5)),
                         trigger=beam.trigger.AfterWatermark(),
                         accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
        | beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults()
        | beam.Map(print)
    )

I am generating an arbitrary number of events and pushing those to my topic - currently 40. I can confirm through the generation of the events that they all succeed in reaching the topic. Upon simply printing the results of the topic (using beam), I can see what I would expect.

However, what I wanted to try was some basic window aggregation and using both beam.CombineGlobally(beam.combiners.CountCombineFn()) and beam.combiners.Count.Globally(), I notice 2 things happening (not strictly at the same time).

The first issue:

  • When I print additional window start/ end timestamps, I am getting more than 1 instance of the same window returned. My expectation on a local runner, would be that there is a single fixed window collecting the number of events and emitting a result.
  • This is the DoFn I've used to get a picture of the windowing data.
class ShowWindowing(beam.DoFn):
    def process(self, elem, window = beam.DoFn.WindowParam):
        yield f'I am an element: {elem}\nstart window time:{window.start.to_utc_datetime()} and the end window time: {window.end.to_utc_datetime()}'
  • And to reiterate, the issue is that I am not getting 'duplicate' results, it is rather I am getting multiple semi-grouped results.

The second issue I have (which I feel is related to the above but I've seen this occur without the semi-grouping of elements):

  • When I execute my pipeline through the CLI (I use notebooks a lot), and generate events to my topic, I am getting considerably less output back which appear to be just partial results.
  • Example: I produce 40 events - each event has a lag of half a second. My window is set to 5 seconds, I expect (give or take) a combined result of 10 each 5 seconds over 20 seconds. What I get is a completely partial result. This could be a count of 1 over a window or a count of 8.

I've read and re-read the docs (admittedly skipping over some of it just to seek an answer) but I've referenced the katas and the Google Dataflow quest to look for examples/ alternatives and I cannot identify where I'm going wrong.

Thanks


Solution

  • I think this boils down to a TODO in the Python local runner in handling watermarks for PubSub subscriptions. Essentially, it thinks it has received all the data up until now, but there is still data in PubSub that has a timestamp less than now() which becomes late data once it is actually read.

    A real runner such as Dataflow won't have this issue.