I've got a PCollection where each element is a key, values tuple like this: (key, (value1,..,value_n) )
I need to split this PCollection in two processing branches.
As always, I need the whole pipeline to be as fast and use as little ram as possible.
Two ideas come to my mind:
class SplitInTwo(beam.DoFn):
def process(self, kvpair):
key, values = kvpair
yield beam.TaggedOutput('left', (key, values[0:2]))
yield beam.TaggedOutput('right', (key, values[2:]))
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key,values = kvpair
...
yield (key, results)
# class ProcessRight is similar to ProcessLeft
And then build the pipeline like this
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.ParDo(ProcessLeft())
right = splitme.right | beam.ParDo(ProcessRight())
Another option is using two DoFns to read and process the same PCollection. Just using one for the 'left' and 'right' hand sides of the data:
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key = kvpair[0]
values = kvpair[0][0:2]
...
yield (key,result)
# class ProcessRight is similar to ProcessLeft
Building the pipleline is simpler... (plus you don't need to track which tagged outputs you have):
left = pcoll | beam.ParDo(ProcessLeft())
right = pcoll| beam.ParDo(ProcessRight())
But... is it faster? will need less memory than the first one?
(I'm thinking about the first option might be fused by the runner - not just a Dataflow runner).
In this case, both options would be fused by the runner, so both options would be somewhat similar in terms of performance. If you would like to reshuffle data into separate workers, then Option 1 is your best choice, as the serialized collection read by ProcessLeft
and ProcessRight
would be smaller.
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())
The Reshuffle
transform would ensure that your data is written to an intermediate shuffle, and then consumed downstream. This would break the fusion.