Why does this ThreadPoolExecutor
execute futures
way before they are called?
import concurrent.futures
import time
def sleep_test(order_number):
num_seconds = 0.5
print(f"Order {order_number} - Sleeping {num_seconds} seconds")
time.sleep(num_seconds)
print(f"Order {order_number} - Slept {num_seconds} seconds")
if order_number == 4:
raise Exception("Reached order #4")
def main():
order_numbers = [i for i in range(10_000)]
max_number_of_threads = 2
with concurrent.futures.ThreadPoolExecutor(max_workers=max_number_of_threads) as executor:
futures = []
for order in order_numbers:
futures.append(executor.submit(sleep_test, order_number=order))
for future in futures:
if future.cancelled():
continue
try:
_ = future.result()
except Exception:
print("Caught Exception, stopping all future orders")
executor.shutdown(wait=False, cancel_futures=True)
if __name__ == "__main__":
main()
Here is a sample execution:
$ python3 thread_pool_test.py
Order 0 - Sleeping 0.5 seconds
Order 1 - Sleeping 0.5 seconds
Order 0 - Slept 0.5 seconds
Order 1 - Slept 0.5 seconds
Order 2 - Sleeping 0.5 seconds
Order 3 - Sleeping 0.5 seconds
Order 2 - Slept 0.5 seconds
Order 4 - Sleeping 0.5 seconds
Order 3 - Slept 0.5 seconds
Order 5 - Sleeping 0.5 seconds
Order 4 - Slept 0.5 seconds
Order 6 - Sleeping 0.5 seconds
Caught Exception, stopping all future orders
Order 5 - Slept 0.5 seconds
Order 4706 - Sleeping 0.5 seconds
Order 6 - Slept 0.5 seconds
Order 4706 - Slept 0.5 seconds
All of a sudden Order 4706 is called seemingly out of nowhere which doesn't make sense to me. I expect the threads to stop at around Order 5 or 6 which is when the Exception
is hit. Sometimes when I run the script it works as expected but other times it calls a function that is thousands of "futures" in the future.
Why is this happening? Can I stop this from happening?
It appears that ThreadPoolExecutor.shutdown
has no mechanism to prevent this, based on the CPython implementation.
It is difficult to completely avoid this, but if you have a list of futures, you can at least avoid having them executed out of order by canceling them manually in reverse order as below.
for f in futures[::-1]: # The key is to cancel in reverse order.
f.cancel()
First, let's look at the ThreadPoolExecutor.submit
implementation.
The following is a simplified version (See the above link for the actual code).
When you submit fn
, it is wrapped in _WorkItem
and put into the queue.
def submit(self, fn, /, *args, **kwargs):
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
return f
The worker thread takes this out and runs it.
def _worker(executor_reference, work_queue, initializer, initargs):
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
Note that the worker thread is not locking anything during this operation.
Instead, the _WorkItem
checks the status of the future before executing the fn
.
If the future is canceled, execution will be aborted here.
def run(self):
if not self.future.set_running_or_notify_cancel():
return
result = self.fn(*self.args, **self.kwargs)
Finally, here is the shutdown implementation.
def shutdown(self, wait=True, *, cancel_futures=False):
if cancel_futures:
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
It takes all items from the queue and cancels them sequentially.
Note that it locks something called self._shutdown_lock
, but this lock does not affect the worker because the worker side does not lock anything.
Taken together, if the thread that executed the shutdown
(in this case, the main thread) releases the GIL for some reason in the middle of emptying the queue, the worker thread can retrieve the item in the middle.
And since future is only cancelled when shutdown
retrieves it, it will be executed if a worker thread retrieves it.