Search code examples
pythonapache-beamgoogle-cloud-pubsubdataflow

Skipping step in an apache beam pipeline Python


So I'm constructing an apache beam pipeline and having some trouble skipping the rest of the steps in the python SDK. Here is a simplified example I'm having trouble with:

import apache_beam as beam
import os 

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
    pass

options = {
    'streaming': True
}

runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
    sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
    result = (sub_message | 'foo' >> beam.Map(foo))
    result | 'print' >> beam.Map(print)

    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()

So according to this: Apache Beam - skip pipeline step which is in Java if my function doesn't return anything then apache_beam should skip the rest of the steps. Correct me if I'm wrong but in python that is the same as returning None so my pass could be replaced with return None and be the exact same. But when I run this code with the pass or return None the result does end up going to the next step. That is, it keeps printing None when it should not be printing anything since it should skip all of the next steps. Any help appreciated.


Solution

  • Funnily enough, as soon as I posted this I found the answer out on the docs. So looks like in the link I provided the equivalent is using a ParDo NOT a map as I did. So really it should look like this:

    import apache_beam as beam
    import os 
    
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
    class TestFn(beam.DoFn):
        def process(self, element):
            print('hi')
            pass
    
    options = {
        'streaming': True
    }
    
    runner = 'DirectRunner'
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    with beam.Pipeline(runner, options=opts) as p:
        sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
        result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
        result | 'print' >> beam.Map(print)
    
        job = p.run()
        if runner == 'DirectRunner':
            job.wait_until_finish()