I'm trying to save by Dask Dataframe to parquet on the same machine as the dask scheduler/workers are located. However, I have trouble during this.
My Dask setup: My python script is executed on my local machine (laptop 16 GB RAM), but the script creates a Dask client to a Dask scheduler running on a remote machine (a server with 400 GB RAM for parallel computations). The Dask scheduler and workers are all located on the same server, thus they all share the same file system, locally available to them. As this remote Dask scheduler is used by all members of my team, the files we are working on are also located on the same server, providing common access to all members to all files through the same Dask cluster.
I have tried:
# This saves the parquet files in a folder on my local machine.
ddf.to_parquet(
'/scratch/dataset_no_dalayed', compression='brotli').compute()
# This delayed call of `ddf.to_parquet` saves the Dask Dataframe chucks
# into individual parquet files (i.e. parts) in the given folder.
# However, I want to persist the Dask dataframe in my workflow, but this
# fails as seen below.
dask.delayed(ddf.to_parquet)(
'/scratch/dataset_dalayed', compression='brotli').compute()
# If the Dask dataframe is persisted, the `to_parquet` fails with
# a "KilledWorker" error!
ddf = client.persist(ddf)
dask.delayed(ddf.to_parquet)(
'/scratch/dataset_persist/', compression='brotli').compute()
# In the example below, I can NOT save the Dask dataframe.
# Because the delayed function makes the Dask dataframe
# to a Pandas dataframe on runtime. And this fails as the path is a
# folder and not at file as Pandas requires!
@dask.delayed
def save(new_ddf):
new_ddf.to_parquet('/scratch/dataset_function/', compression='brotli')
save(ddf).compute()
How to do this correct?
Usually to save a dask dataframe as a parquet dataset people do the following:
df.to_parquet(...)
From your question it sounds like your workers may not all have access to a shared file system like NFS, or S3. If this is the case and you store to local drives then your data will be scattered on various machines without an obvious way to collect them together. In principle, I encouarage you to avoid this, and invest in a shared file system. They are very helpful when doing distributed computing.
If you can't do that then I personally would probably write in parallel to local drives and then scp them over back to one machine afterwards.
If your dataset is small enough then you could also call .compute
to get back to a local Pandas dataframe and then write that using Pandas
df.compute().to_parquet(...)