So I'm constructing an apache beam pipeline and having some trouble skipping the rest of the steps in the python SDK. Here is a simplified example I'm having trouble with:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
result = (sub_message | 'foo' >> beam.Map(foo))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
So according to this: Apache Beam - skip pipeline step which is in Java if my function doesn't return anything then apache_beam should skip the rest of the steps. Correct me if I'm wrong but in python that is the same as returning None so my pass
could be replaced with return None
and be the exact same. But when I run this code with the pass
or return None
the result does end up going to the next step. That is, it keeps printing None
when it should not be printing anything since it should skip all of the next steps. Any help appreciated.
Funnily enough, as soon as I posted this I found the answer out on the docs. So looks like in the link I provided the equivalent is using a ParDo NOT a map as I did. So really it should look like this:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
def process(self, element):
print('hi')
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()