Search code examples
pythonmultithreadingcallbackconcurrent.futures

Python: Getting a concurrent.futures Executor to wait for done_callbacks to complete


Is it possible to get a ThreadPoolExecutor to wait for all its futures and their add_done_callback() functions to complete without having to call .shutdown(wait=True)? The following code snippet illustrates the essence of what I'm trying to accomplish, which is to reuse the thread pool between iterations of the outer loop.

from concurrent.futures import ThreadPoolExecutor, wait
import time

def proc_func(n):
    return n + 1


def create_callback_func(fid, sleep_time):
    def callback(future):
        time.sleep(sleep_time)
        fid.write(str(future.result()))
        return

    return callback


num_workers = 4
num_files_write = 3
num_tasks = 8
sleep_time = 1

pool = ThreadPoolExecutor(max_workers=num_workers)

for n in range(num_files_write):
    fid = open(f'test{n}.txt', 'w')
    futs = []

    callback_func = create_callback_func(fid, sleep_time)

    for t in range(num_tasks):
        fut = pool.submit(proc_func, n)
        fut.add_done_callback(callback_func)
        futs.append(fut)

    wait(futs)
    fid.close()

pool.shutdown(wait=True)

Running this code throws a bunch of ValueError: I/O operation on closed file. and the three files that get written have contents:
test0.txt -> 1111
test1.txt -> 2222
test3.txt -> 3333

Clearly this is wrong and there should be eight of each numeral. If I create and shutdown a separate ThreadPoolExecutor for each file, then the correct result is achieved. So I know that the Executor has the ability to properly wait for all the callbacks to finish, but can I tell it to do so without shutting it down?


Solution

  • I'm afraid that cannot be done and you are "misusing" the callback.

    The primary purpose of the callback is to notify that the scheduled work has been done.

    The internal future states are PENDING -> RUNNING -> FINISHED (disregarding cancellations for brevity). When the FINISHED state is reached, the callbacks are invoked, but there is no next state when they finish. That's why it is not possible to synchronize with that event.

    The core of the execution of a submitted function in one of the available threads is (simplified):

    try:
        result = self.fn(*self.args, **self.kwargs)
    except BaseException as exc:
         self.future.set_exception(exc)
    else:
        self.future.set_result(result)
    

    where both set_exception and set_result look like this (very simplified):

    ... save the result/exception
    self._state = FINISHED
    ... wakeup all waiters
    self._invoke_callbacks() # this is the last statement
    

    The future is in FINISHED, i.e. "done" state when the "done" callback is called. It would not make sense to notify that the work is done before marking it done.

    As you noticed already, in your code:

    wait(futs)
    fid.close()
    

    the wait returns, the file get closed, but the callback is not finished yet and fails attemtping to write to a closed file.


    The second question is why shutdown(wait=True) works? Simply because it waits for all threads:

    if wait:
        for t in self._threads:
            t.join()
    

    Those threads execute also the callbacks (see the code snippets above). That's why the callback execution must be finished when the threads are finished.