I need a simple task to count the number of message in a Fixed Window from a unbounded data source.
The steps are:
As a data source I am using a public pub/sub topic projects/pubsub-public-data/topics/taxirides-realtime
topic_name = "projects/pubsub-public-data/topics/taxirides-realtime"
options = pipeline_options.PipelineOptions()
options.view_as(pipeline_options.StandardOptions).streaming = True
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
ib.options.recording_duration = '18s'
class dataAsKey(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
yield (format(window.start.to_utc_datetime()) , 1)
p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)
data = (p
| "Read" >> beam.io.ReadFromPubSub(topic=topic_name)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(6))
)
ib.show(data)
As you can see in the image above, 110 messages was gotten by the Interactive Beams. But, when I need to transform a PCollection using Window Timestamp as a key and then count the messages por key, the number of message don't match with the total after aggregation. In the sample bellow, the count of message per key totaled 50.
count = data(
| 'Data as key' >> beam.ParDo(dataAsKey())
| 'Count per Window' >> Count.PerKey()
ib.show(count)
The curious thing is that, when I use this same code with a bounded data source, the values match.
use a GroupByKey transform instead of a Count.PerKey transform. This will group all the elements in a window by their keys and return an iterable of values for each key. You can then count the number of elements in each iterable to get the count of messages per key.