I am using dask for extending dask bag items by information from an external, previously computed object arg
. Dask seems to allocate memory for arg
for each partition at once in the beginning of the computation process.
Is there a workaround to prevent Dask from duplicating the arg
multiple times (and allocating a lot of memory)?
Here is a simplified example:
from pathlib import Path
import numpy as np
import pandas as pd
from dask import bag
in_dir = Path.home() / 'in_dir'
out_dir = Path.home() / 'out_dir'
in_dir.mkdir(parents=True, exist_ok=True)
out_dir.mkdir(parents=True, exist_ok=True)
n_files = 100
n_lines_per_file = int(1e6)
df = pd.DataFrame({
'a': np.arange(n_lines_per_file).astype(str)
})
for i in range(n_files):
df.to_csv(in_dir / f'{i}.txt', index=False, header=False)
def mapper(x, arg):
y = x # map x to y using arg
return y
arg = np.zeros(int(1e7))
(
bag
.read_text(str(in_dir / '*.txt'))
.map((lambda x, y: x), arg)
.to_textfiles(str(out_dir / '*.txt'))
)
One strategy for dealing with this is to scatter
your data to workers first:
import dask.bag, dask.distributed
client = dask.distributed.Client()
arg = np.zeros(int(1e7))
arg_f = client.scatter(arg, broadcast=True)
(
dask.bag
.read_text(str(in_dir / '*.txt'))
.map((lambda x, y: x), arg_f)
.to_textfiles(str(out_dir / '*.txt'))
)
This sends a copy of the data to each worker, but does not create a copy for each task.