Search code examples
pythonapache-beam

Apache Beam - Counting message in a Unbounded PCollection (Per Window)


I need a simple task to count the number of message in a Fixed Window from a unbounded data source.

The steps are:

  1. Read data from Pub/sub
  2. Define Window Fixed time
  3. Create a (key, value) where a key is a window timestamp
  4. Count messages per key

As a data source I am using a public pub/sub topic projects/pubsub-public-data/topics/taxirides-realtime

topic_name = "projects/pubsub-public-data/topics/taxirides-realtime"

options = pipeline_options.PipelineOptions()
options.view_as(pipeline_options.StandardOptions).streaming = True
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

ib.options.recording_duration = '18s'

class dataAsKey(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
         yield (format(window.start.to_utc_datetime()) , 1)

p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)

data = (p 
    | "Read" >> beam.io.ReadFromPubSub(topic=topic_name) 
    | 'Window' >> beam.WindowInto(beam.window.FixedWindows(6))
)
ib.show(data)

enter image description here

As you can see in the image above, 110 messages was gotten by the Interactive Beams. But, when I need to transform a PCollection using Window Timestamp as a key and then count the messages por key, the number of message don't match with the total after aggregation. In the sample bellow, the count of message per key totaled 50.

count = data(
    | 'Data as key' >> beam.ParDo(dataAsKey())
    | 'Count per Window' >> Count.PerKey()

ib.show(count)

enter image description here

The curious thing is that, when I use this same code with a bounded data source, the values match.


Solution

  • use a GroupByKey transform instead of a Count.PerKey transform. This will group all the elements in a window by their keys and return an iterable of values for each key. You can then count the number of elements in each iterable to get the count of messages per key.