Search code examples
pythonapache-beam

PTransform on Side Input results in warning and bad performance


I would like to apply a Transform to a side input PCollection with Apache Beam. The transform of the side input should be performed for every element of the base PCollection, and the details for the transforms are read from the respective element. It somewhat works, but it triggers the following warning for each element of the base_data PCollection:

WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['test.py']

(test.py is the name of my Python script)

More importantly, applying a transform on a side input PCollection inside a transform causes a massive performance drop.

Here is a minimal example which triggers this behavior:

# Using Python 3.10.9 and Apache Beam 2.44.0

import apache_beam as beam


class Test(beam.DoFn):
    def process(self, element, side_input):
        # This is the PTransform
        side_input | beam.Filter(lambda _: True)
        yield element


with beam.Pipeline() as pipeline:
    base_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    side_input = pipeline | 'Create test data' >> beam.Create([1, 2])

    output = (
        base_data
        | beam.ParDo(Test(), side_input=beam.pvalue.AsIter(side_input))
        | beam.Map(print)
    )

In this example the transform in the Test class does nothing, but it still triggers the behavior in question. This example already takes more than a second to run, while without this (very simple) side input transform it finishes instantly. The actual pipeline that I'm working on is of course more complex, and takes very long (>1min) to complete even though the applied transforms are very simple and applied to a PCollection of only 39 elements.

I would like to know if applying a transform on a side input inside a transform is simply not something that you're supposed to do, or if I'm just doing it incorrectly.

Thanks!


Solution

  • You are confusing a DoFn with a PTransform and mixing the two concepts in your code.

    A DoFn is executing its process() function for each incoming element, and this is where you should provide your processing logic on a single element level. This process may be executed multiple times. In your case, it is executing a pipeline snipped multiple (and most likely unecessarily often) times.

    Such a piece of code should go to an actual PTransform, in particular into its expand() method. You may see the PTransform as a 'pipeline function/method' which you can use to reuse pipeline snippets and formatting your pipeline code.

    Here is an example using both ways to achieve a simple filtering

    import apache_beam as beam
    
    def my_filter_using_side_input(element, side_input):
      return element in side_input
    
    # This is a PTransform with a side input
    class SideInputPTransform(beam.PTransform):
      def __init__(self, side_input):
        self.side_input = side_input
        
      def expand(self, pcoll):
        # note the return
        return (
          pcoll
          | beam.Filter(my_filter_using_side_input, self.side_input)
        )
    
    
    # This is a DoFn with a side input
    class SideInputDoFn(beam.DoFn):
      # note the yield
      def process(self, element, side_input):
        if my_filter_using_side_input(element, side_input):
          yield element
    
    
    with beam.Pipeline() as pipeline:
      base_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
      side_input = pipeline | 'Create test data' >> beam.Create([1, 2])
    
      # use side_input with DoFn
      (
        base_data
        | "DoFn" >> beam.ParDo(SideInputDoFn(), side_input=beam.pvalue.AsIter(side_input))
        | "Print DoFn" >> beam.Map(print)
      )
      
      # use side_input with PTransform
      (
        base_data
        | "PTransform" >> SideInputPTransform(side_input=beam.pvalue.AsIter(side_input))
        | "Print PTransform" >> beam.Map(print)
      )