Search code examples
pythongoogle-cloud-dataflowapache-beamapache-beam-internals

Speed and memory tradeoffs splitting Apache Beam PCollection in two


I've got a PCollection where each element is a key, values tuple like this: (key, (value1,..,value_n) )

I need to split this PCollection in two processing branches.

As always, I need the whole pipeline to be as fast and use as little ram as possible.

Two ideas come to my mind:

Option 1: Split the PColl with a DoFn with multiple outputs

class SplitInTwo(beam.DoFn):

   def process(self, kvpair):
       key, values = kvpair
       
       yield beam.TaggedOutput('left', (key, values[0:2]))
       yield beam.TaggedOutput('right', (key, values[2:]))

class ProcessLeft(beam.DoFn):
   def process(self, kvpair):
       key,values = kvpair
       ...
       yield (key, results)

# class ProcessRight is similar to ProcessLeft

And then build the pipeline like this

   splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
   left = splitme.left | beam.ParDo(ProcessLeft())
   right = splitme.right | beam.ParDo(ProcessRight())

Option 2: Use two different DoFn on the original PCollection

Another option is using two DoFns to read and process the same PCollection. Just using one for the 'left' and 'right' hand sides of the data:

class ProcessLeft(beam.DoFn):

   def process(self, kvpair):
       key = kvpair[0]
       values = kvpair[0][0:2]
       ...
       yield (key,result)

# class ProcessRight is similar to ProcessLeft

Building the pipleline is simpler... (plus you don't need to track which tagged outputs you have):

   left = pcoll | beam.ParDo(ProcessLeft())
   right = pcoll| beam.ParDo(ProcessRight())

But... is it faster? will need less memory than the first one?

(I'm thinking about the first option might be fused by the runner - not just a Dataflow runner).


Solution

  • In this case, both options would be fused by the runner, so both options would be somewhat similar in terms of performance. If you would like to reshuffle data into separate workers, then Option 1 is your best choice, as the serialized collection read by ProcessLeft and ProcessRight would be smaller.

       splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
       left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
       right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())
    

    The Reshuffle transform would ensure that your data is written to an intermediate shuffle, and then consumed downstream. This would break the fusion.