We have a dataflow pipeline which collects thousand of parquet files in a bucket, does some transformations and concatenate them all together. However when we increase the number of files we want to process (for instance doubling the number of files), we often encounter crashing workers because they are running out of disk space. We confirmed this by running the same pipeline with more disk space (~500Gb instead of the standard 20Gb) which works.
However we do not understand where all this disk utilization is coming from. The final output is around 1Gb usually and the raw base files around 2-4Mb each. We suspect that the base files are somehow opened on workers and accumulating because they are not properly closed but we don't know how to confirm this.
Here's our pipeline:
import appache_beam as beam
import pandas as pd
class GetFileList(beam.DoFn):
def process(
self, element,
) -> Iterator[str]:
blob_names = GcsIO().list_files(
f"gs://bucket_name",
)
for blob in blob_names:
yield blob
class DownloadFile(beam.DoFn):
def process(
self, element,
) -> Iterator[tuple[str, pd.Dataframe]]:
with GcsIO().open(file_name, "rb") as file:
yield get_grouping_key(file_name), do_transformations(pd.read_parquet(file))
def concat_and_export(element: tuple[str, list[pd.Dataframe]):
df = pd.concat(element[1]
with GcsIO().open(file_name, "w") as file:
df.to_parquet(file)
with beam.pipeline.Pipeline() as pipeline:
(pipeline
| "Initialize ">> beam.Create(None)
| "Get list of files" >> beam.ParDo(GetFileList())
| "Reshuffle to prevent fusing and distribute between workers" >> beam.Reshuffle()
| "Download files" >> beam.ParDo(DownloadFile())
| "Group by" >> beam.GroupByKey()
| "Concatenate and export results to bucket" >> beam.Map(concat_and_export)
I figured out that the issue was with the concatenation step. For some reason regrouping a large amount of dataframes in the same step wasn't working out when concatenating them all together, maybe it's a limitation of pandas or maybe my workers were running out of memory, I couldn't figure it out.
I solved it by using a custom CombineFn which is aggregating all the frames as they are being grouped by:
class CombineFeatures(beam.CombineFn):
"""An accumulator that combines multiple dataframes into a single dataframe."""
def create_accumulator(self) -> pd.DataFrame:
return pd.DataFrame()
def add_input(self, accumulator: pd.DataFrame, input: pd.DataFrame) -> pd.DataFrame:
return pd.concat([accumulator, input], axis=0)
def merge_accumulators(self, accumulators: list[pd.DataFrame]) -> pd.DataFrame:
return pd.concat(accumulators, axis=0)
def extract_output(self, accumulator: pd.DataFrame) -> pd.DataFrame:
return accumulator.sort_index()
Then in the pipeline:
with beam.pipeline.Pipeline() as pipeline:
(pipeline
| "Initialize ">> beam.Create(None)
| "Get list of files" >> beam.ParDo(GetFileList())
| "Reshuffle to prevent fusing and distribute between workers" >> beam.Reshuffle()
| "Download files" >> beam.ParDo(DownloadFile())
| "Combine frames" >> beam.CombinePerKey(CombineFeatures())
)
This runs without any issues now.