Search code examples
dask

How do I stop a running task in Dask?


When using Dask's distributed scheduler I have a task that is running on a remote worker that I want to stop.

How do I stop it? I know about the cancel() method, but this doesn't seem to work if the task has already started executing.


Solution

  • If it's not yet running

    If the task has not yet started running you can cancel it by cancelling the associated future

    future = client.submit(func, *args)  # start task
    future.cancel()                      # cancel task
    

    If you are using dask collections then you can use the client.cancel() method

    x = x.persist()   # start many tasks 
    client.cancel(x)  # cancel all tasks
    

    If it is running

    However if your task has already started running on a thread within a worker then there is nothing that you can do to interrupt that thread. Unfortunately this is a limitation of Python.

    Build in an explicit stopping condition

    The best you can do is to build in some sort of stopping criterion into your function with your own custom logic. You might consider checking a shared variable within a loop. Look for "Variable" in these docs: http://dask.pydata.org/en/latest/futures.html

    from dask.distributed import Client, Variable
    
    client = Client()
    stop = Variable()
    stop.set(False)
    
    def long_running_task():
        while not stop.get():
            ... do stuff
    
    future = client.submit(long_running_task)
    
    ... wait a while
    
    stop.set(True)