Search code examples
pythondaskdask-distributed

close dask client and cluster when catching any errors or exceptions


I am writing a python function to do data processing with dask. I want to automatically close the dask cluster and client if any errors or exceptions raised. So I use the with ... as: statement. The function structure is:

def func(input:str, # path to input
         output:str, # path to output
        ):
    with LocalCluster() as cluster, Client(cluster) as client:
        # load the input with dask
        # set up computing graph
        da.compute([...])

The cluster and client are successfully closed if I make a KeyboardInterrupt when the data are processing, i.e, da.compute(). However, when the program is setting up the cluster, i.e, calling LocalCluster(), the cluster are not successfully closed when I make an interrupt. I will get something as:

......
KeyboardInterrupt: 
2023-10-20 18:55:38,520 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,523 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,529 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,532 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,535 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,548 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,549 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,556 - distributed.nanny - WARNING - Restarting worker

The worker are not stopped.

So, the next time when I run this function again, I will get:

/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37963 instead
  warnings.warn(

since the previous cluster is not closed.

Any method to automatically close the cluster when interrupt happens during creating the LocalCluster?


Solution

  • I am pretty sure this would require a deep code change in distributed to achieve. The way that with works, the context is only set up once LocalCluster() completes, so if you interrupt before then, there is no context yet to safely exit out of.

    One thing you might experiment with is

    cluster = LocalCluster(n_workers=0, ...)
    with cluster:
        cluster.scale(...)
        # cluster.wait_for_workers(...) # if you need it
        with Client(cluster) as client:
            compute(...)
    

    where now the slow line happens inside a context.