I want to implement retry logic with Python's concurrent.futures.ThreadPoolExecutor
. I would like the following properties:
A lot of existing code I found online basically operates in "rounds", where they call as_completed
on an initial list of futures, resubmits failed futures, gathers those futures in a new list, and goes back to calling as_completed
on the new list if it's not empty. Basically something like this:
with concurrent.futures.ThreadPoolExecutor(...) as executor:
futures = {executor.submit(fn, job): job for job in jobs}
while len(futures) > 0:
new_futures = {}
for fut in concurrent.futures.as_completed(futures):
if fut.exception():
job = futures[fut]
new_futures[executor.submit(fn, job)] = job
else:
... # logic to handle successful job
futures = new_futures
However, I think that doesn't satisfy the first property, since it's possible that a retried future completes before the initial futures, but we won't process it until all the initial futures complete.
Here's a hypothetical pathological case. Let's say we have two jobs, the first runs for 1 second but has a 90% chance of failure, while the second runs for 100 seconds. If our executor has 2 workers, and the first job fails after 1 second, we'll retry it immediately. But if it failed again, we won't be able to retry until the second job completes.
So my question is, is it possible to implement retry logic with these desired properties, without using external libraries or rewriting low-level executor logic? One thing I tried is putting the retry logic in the code sent to the worker:
def worker_job(fn):
try:
return fn()
except Exception:
executor.submit(fn)
with concurrent.futures.ThreadPoolExecutor(...) as executor:
jobs = [functools.partial(fn, arg) for arg in args]
executor.map(worker_job, jobs)
But it seems like submitting new jobs from within a job doesn't work.
as_completed
Loop with wait(..., return_when=FIRST_COMPLETED)
instead of as_completed(...)
.
Trade-offs:
pending
futures (re-adding waiter, building new_futures
).timeout
.with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fn, job): job for job in jobs}
while len(futures) > 0:
new_futures = {}
done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
for fut in done:
if fut.exception():
job = futures[fut]
new_futures[executor.submit(fn, job)] = job
else:
... # logic to handle successful job
for fut in pending:
job = futures[fut]
new_futures[fut] = job
futures = new_futures
Tweak as_completed(...)
to add to fs
and pending
, and use waiter
.
Trade-off: Maintenance.
Advantage: Ability to specify overall timeout
if wanted.
class AsCompletedWaiterWrapper:
def __init__(self):
self.fs = None
self.pending = None
self.waiter = None
def listen(self, fut):
with self.waiter.lock:
self.fs.add(fut)
self.pending.add(fut)
fut._waiters.append(self.waiter)
def as_completed(self, fs, timeout=None):
"""
concurrent.futures.as_completed plus the 3 lines marked with +.
"""
if timeout is not None:
end_time = timeout + time.monotonic()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
self.fs = fs # +
self.pending = pending # +
self.waiter = waiter # +
finished = list(finished)
try:
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), total_futures))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
# reverse to keep finishing order
finished.reverse()
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))
finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
Usage:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fn, job): job for job in jobs}
w = AsCompletedWaiterWrapper()
for fut in w.as_completed(futures):
if fut.exception():
job = futures[fut]
new_fut = executor.submit(fn, job)
futures[new_fut] = job
w.listen(new_fut)
else:
... # logic to handle successful job
Wait for events
in with ... executor:
as ThreadPoolExecutor.__exit__
shuts down executor
so it cannot schedule new futures.
Trade-offs:
ProcessPoolExecutor
due to executor
reference in main process.def worker_job(fn, event):
try:
rv = fn()
event.set()
return rv
except Exception:
executor.submit(worker_job, fn, event)
with concurrent.futures.ThreadPoolExecutor() as executor:
jobs = [functools.partial(fn, arg) for arg in args]
events = [threading.Event() for _ in range(len(jobs))]
executor.map(worker_job, jobs, events)
for e in events:
e.wait()