I want to run threads in a ThreadPoolExecutor until one of them give me a specific result. For now my code looks like this:
pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
futures = [pool.submit(g,i) for i in range (4)]
j = 4
for f in concurrent.futures.as_completed(futures):
if f.result():
break # OK the result is found
else:
j += 1
futures.append(pool.submit(g,j))
but the append
in last line seems to have no effect on the as_completed
generator. Is there a way to achieve that?
You could just keep checking each future in the sequence till you find one that is done.
from collections import deque
pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
futures = deque(pool.submit(g,i) for i in range (4))
j = 4
result = False
while not result:
while not futures[0].done():
futures.rotate()
future = futures.popleft()
result = future.result()
if not result:
j += 1
futures.append(pool.submit(g,j))
Here is a similar solution using concurrent.futures.wait
.
First a callable for testing purposes:
import random
class F:
def __init__(self, threshold=.03):
self._bool = random.random() < threshold
def __call__(self, n):
self.n = n
time.sleep(random.random())
return self
def __bool__(self):
return self._bool
def __str__(self):
return f'I am number {self.n}'
def __repr__(self):
return f'I am number {self.n}'
Solution
pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
j = 4
futures = [pool.submit(F(),i) for i in range(j)]
result = False
while not result:
#print(f'there are {len(futures)} futures')
maybe_futures = concurrent.futures.wait(futures, return_when='FIRST_COMPLETED')
futures = maybe_futures.not_done
# more than one may have completed(?)
for future in maybe_futures.done:
temp = future.result()
if not temp:
j += 1
futures.add(pool.submit(F(),j))
else:
result = temp
break
Another solution using a callback (uses the callable above): not sure what happens if a future completes before the callback gets added.
completed_futures = collections.deque()
result = False
def callback(future, completed=completed_futures):
completed.append(future)
j = 4
with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as pool:
#initial tasks
for i in range(j):
future = pool.submit(F(),i)
future.add_done_callback(callback)
while not result: # is this the same as - while True: ?
while not completed_futures:
pass
while completed_futures:
future = completed_futures.popleft()
result = future.result()
if result:
break
j += 1
future = pool.submit(F(),j)
future.add_done_callback(callback)
print(result)