Is it possible to have Dask return a default value if a delayed object uses too much memory?
I want to evaluate a list of machine learning pipelines on large datasets in parallel. I am doing this in a loop where I generate models/pipelines then evaluate them.
This is how I am executing the parallelization:
for i in range(10):
pipeline_list = generate_next_pipelines()
scores = dask.compute(*[dask.delayed(fit_and_score)(pipeline, X, y) for pipeline in pipeline_list])
# save/print scores
I also get errors about high unmanaged memory usage. Is there a step I am missing that would either reduce memory usage and/or free the unreleased memory more often?
I increased the memory limit by setting a LocalCluster to the Max memory of the system. This allows the code to run, but if a task requests more memory than available, the entire script crashes. I would like for Dasks to instead return a default value such as "Not enough RAM" if a given worker goes over the memory limit.
cluster = LocalCluster(n_workers=n_jobs,
threads_per_worker=1,
memory_limit='64GB')
client = Client(cluster)
Thanks for the help
If you function raises a MemoryError, then you can use standard python ways to detect it and return something different
@dask.delayed
def func_or(pipeline, X, y):
try:
return fit_and_score(pipeline, X, y)
except MemoryError:
return alternate_thing
However, if your worker is being killed completely due to memory use, your function will return with an exception (KilledWorker typically) and you need to detect this in the return value. You may with to use the client.submit API instead of delayed/dask.compute to get realtime information on the state of each element.