Search code examples
pythongoogle-cloud-dataflowpipelinebatch-processingapache-beam

Batching with BatchElements works differently in DirectRunner and DataflowRunner (GCP/Dataflow)


I'm building a pipeline with apache beam (GCP Dataflow) and python, my pipeline looks like this:

...
with beam.Pipeline(options=self.pipeline_options) as pipeline:
            somepipeline = (
                pipeline
                | "ReadPubSubMessage" >> ReadFromPubSub(
                    subscription=self.custom_options.some_subscription)
                | "Windowing" >> beam.WindowInto(beam.window.FixedWindows(30))
                | "DecodePubSubMessage" >> beam.ParDo(DecodePubSubMessage()).with_outputs(ERROR_OUTPUT_NAME, main=MAIN_OUTPUT_NAME)
                | "Geting and sorting listings" >> beam.ParDo(SortByCompletion())
                | "Batching listings" >> beam.BatchElements(min_batch_size=3,max_batch_size=3) 
                | "Print logs" >> beam.Map(logging.info)
            )
...

And everything works as expected when I run pipeline via DirectRunner (you can see 1 batch with 3 elements inside): directrunner_result

But when I run the same code with a DataflowRunner I'm getting this result (you can see 3 batches with 1 element inside of each batch): dataflow_result

This happens even when I'm running this pipeline in parallel (in two terminal windows). Both were run with a streaming flag. Messages were sent to pubsub via python script one by one immediately.

Question: What can cause this problem with DataflowRunner (my assumption was the number of workers in dataflow but when I checked it there was only 1 worker in this job) and how I can get the same result as via DirrectRunner.

Thank you!


Solution

  • BatchElements is non-deterministic and doesn't batch things across bundles. The direct runner is really simple and puts the entire PCollection into a single bundle, but Dataflow is written as a distributed runner and even if there is only one worker, there may be multiple bundles running concurrently (e.g. on different threads) and bundles tend to be fairly small.

    You could look into using Beam's GroupIntoBatches which works better in streaming mode (though that requires choosing a key within which things are batched).