I would like to apply a Transform to a side input PCollection with Apache Beam. The transform of the side input should be performed for every element of the base PCollection, and the details for the transforms are read from the respective element. It somewhat works, but it triggers the following warning for each element of the base_data PCollection:
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['test.py']
(test.py is the name of my Python script)
More importantly, applying a transform on a side input PCollection inside a transform causes a massive performance drop.
Here is a minimal example which triggers this behavior:
# Using Python 3.10.9 and Apache Beam 2.44.0
import apache_beam as beam
class Test(beam.DoFn):
def process(self, element, side_input):
# This is the PTransform
side_input | beam.Filter(lambda _: True)
yield element
with beam.Pipeline() as pipeline:
base_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
side_input = pipeline | 'Create test data' >> beam.Create([1, 2])
output = (
base_data
| beam.ParDo(Test(), side_input=beam.pvalue.AsIter(side_input))
| beam.Map(print)
)
In this example the transform in the Test class does nothing, but it still triggers the behavior in question. This example already takes more than a second to run, while without this (very simple) side input transform it finishes instantly. The actual pipeline that I'm working on is of course more complex, and takes very long (>1min) to complete even though the applied transforms are very simple and applied to a PCollection of only 39 elements.
I would like to know if applying a transform on a side input inside a transform is simply not something that you're supposed to do, or if I'm just doing it incorrectly.
Thanks!
You are confusing a DoFn with a PTransform and mixing the two concepts in your code.
A DoFn
is executing its process()
function for each incoming element, and this is where you should provide your processing logic on a single element level. This process
may be executed multiple times. In your case, it is executing a pipeline snipped multiple (and most likely unecessarily often) times.
Such a piece of code should go to an actual PTransform
, in particular into its expand()
method. You may see the PTransform as a 'pipeline function/method' which you can use to reuse pipeline snippets and formatting your pipeline code.
Here is an example using both ways to achieve a simple filtering
import apache_beam as beam
def my_filter_using_side_input(element, side_input):
return element in side_input
# This is a PTransform with a side input
class SideInputPTransform(beam.PTransform):
def __init__(self, side_input):
self.side_input = side_input
def expand(self, pcoll):
# note the return
return (
pcoll
| beam.Filter(my_filter_using_side_input, self.side_input)
)
# This is a DoFn with a side input
class SideInputDoFn(beam.DoFn):
# note the yield
def process(self, element, side_input):
if my_filter_using_side_input(element, side_input):
yield element
with beam.Pipeline() as pipeline:
base_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
side_input = pipeline | 'Create test data' >> beam.Create([1, 2])
# use side_input with DoFn
(
base_data
| "DoFn" >> beam.ParDo(SideInputDoFn(), side_input=beam.pvalue.AsIter(side_input))
| "Print DoFn" >> beam.Map(print)
)
# use side_input with PTransform
(
base_data
| "PTransform" >> SideInputPTransform(side_input=beam.pvalue.AsIter(side_input))
| "Print PTransform" >> beam.Map(print)
)