Search code examples
pythonpython-3.xapache-beamwith-statement

Do apache_beam pipelines need to be declared using a with statement?


I started learning apache beam, and while trying to build my very first pipeline (in python), I encountered a strange behavior.

Here are the elements of my code :

A file : count_words.txt containing:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

A version of the code that works:wordcount_exercise.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

input_file = "data/input/count_words.txt"

# 1 - Create pipelineOptions object to run the pipeline
beam_options = PipelineOptions(runner="DirectRunner")

# 2 - Apply transforms
# 2.1 - Read text from file
with beam.Pipeline(options=beam_options) as pipeline:
    lines = pipeline | beam.io.ReadFromText(input_file) | beam.Map(print)

Output :

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

A version of the code that does not work:wordcount_exercise.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

input_file = "data/input/count_words.txt"

# 1 - Create pipelineOptions object to run the pipeline
beam_options = PipelineOptions(runner="DirectRunner")

# 2 - Create the pipeline using the pipelineOptions
pipeline = beam.Pipeline(options=beam_options)
# 3 - Apply transforms
# 3.1 - Read text from file
lines = pipeline | beam.io.ReadFromText(input_file) | beam.Map(print)

Nothing is output.

Why does the pipeline need to be declared within a with statement ? And why isn't it working with a simple declaration ?


Solution

  • The pipeline doesn't need a with statement, but adding it removes the need to manually run the pipeline with pipeline.run() ( or p.run() in some cases where you pipeline variable is p).

    Similarly as opening files with a with that closes them automatically (f.close()), the with for Pipeline() uses p.run().

    For the code you posted, simply add pipeline.run() at the end.