I started learning apache beam, and while trying to build my very first pipeline (in python), I encountered a strange behavior.
Here are the elements of my code :
A file : count_words.txt
containing:
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato
A version of the code that works:wordcount_exercise.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
input_file = "data/input/count_words.txt"
# 1 - Create pipelineOptions object to run the pipeline
beam_options = PipelineOptions(runner="DirectRunner")
# 2 - Apply transforms
# 2.1 - Read text from file
with beam.Pipeline(options=beam_options) as pipeline:
lines = pipeline | beam.io.ReadFromText(input_file) | beam.Map(print)
Output :
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato
A version of the code that does not work:wordcount_exercise.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
input_file = "data/input/count_words.txt"
# 1 - Create pipelineOptions object to run the pipeline
beam_options = PipelineOptions(runner="DirectRunner")
# 2 - Create the pipeline using the pipelineOptions
pipeline = beam.Pipeline(options=beam_options)
# 3 - Apply transforms
# 3.1 - Read text from file
lines = pipeline | beam.io.ReadFromText(input_file) | beam.Map(print)
Nothing is output.
Why does the pipeline need to be declared within a with
statement ? And why isn't it working with a simple declaration ?
The pipeline doesn't need a with
statement, but adding it removes the need to manually run the pipeline with pipeline.run()
( or p.run()
in some cases where you pipeline variable is p
).
Similarly as opening files with a with
that closes them automatically (f.close()
), the with
for Pipeline()
uses p.run()
.
For the code you posted, simply add pipeline.run()
at the end.