I am using Apache Beam on GCP Dataflow. I want to use a PCollection multiple times but I'm worried that it might recompute an expensive PCollection. I can't find a "materialize" or "cache" transform in Apache Beam documentation.
import apache_beam as beam
# Set up a pipeline and read in a PCollection
p = beam.Pipeline()
input_data = p | beam.io.ReadFromText('input.txt')
reused_data = input_data | beam.Map(some_expensive_function)
# Write the outputs to different files
reused_data | beam.io.WriteToText('output1.txt')
reused_data | beam.io.WriteToText('output2.txt')
# Run the pipeline
p.run()
What will happen here? Will it recompute my data or will it cache my data? What if I don't have enough memory on my machines?
In the pipeline as written, nothing will be cached or re-computed (modulo failure recovery). Though the details are left up to the runner, most runners do what is called fusion. In particular, what will happen in this case is roughly
If there were other DoFns between 2 and 3/4, they would be applied, element by element, and their outputs fully taken care of, before going back to step 1 to start on the next element. At no point is the full reused_data
PCollection materialized, it's only materialized one element at a time (possibly in parallel across many workers of course).
If for some reason fusion is not possible (this happens with conflicting resource constraints or side inputs sometimes) the intermediate data is implicitly materialized to disk rather than re-computed.