Search code examples
pythonthreadpoolexecutorconcurrent.futures

Python concurrent.futures run threads in pool until a result is found


I want to run threads in a ThreadPoolExecutor until one of them give me a specific result. For now my code looks like this:

pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
futures = [pool.submit(g,i) for i in range (4)]
j = 4
for f in concurrent.futures.as_completed(futures):
    if f.result():
        break # OK the result is found
    else:
        j += 1
        futures.append(pool.submit(g,j))

but the append in last line seems to have no effect on the as_completed generator. Is there a way to achieve that?


Solution

  • You could just keep checking each future in the sequence till you find one that is done.

    from collections import deque
    pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
    
    futures = deque(pool.submit(g,i) for i in range (4))
    j = 4
    result = False
    while not result:
        while not futures[0].done():
            futures.rotate()
        future = futures.popleft()
        result = future.result()
        if not result:
            j += 1
            futures.append(pool.submit(g,j))
    

    Here is a similar solution using concurrent.futures.wait.
    First a callable for testing purposes:

    import random
    class F:
        def __init__(self, threshold=.03):
            self._bool = random.random() < threshold
        def __call__(self, n):
            self.n = n
            time.sleep(random.random())
            return self
        def __bool__(self):
            return self._bool
        def __str__(self):
            return f'I am number {self.n}'
        def __repr__(self):
            return f'I am number {self.n}'
    

    Solution

    pool = concurrent.futures.ThreadPoolExecutor(max_workers = 4)
    j = 4
    futures = [pool.submit(F(),i) for i in range(j)]
    result = False
    while not result:
        #print(f'there are {len(futures)} futures')
        maybe_futures = concurrent.futures.wait(futures, return_when='FIRST_COMPLETED')
        futures = maybe_futures.not_done
        # more than one may have completed(?)
        for future in maybe_futures.done:
            temp = future.result()
            if not temp:
                j += 1
                futures.add(pool.submit(F(),j))
            else:
                result = temp
                break
    

    Another solution using a callback (uses the callable above): not sure what happens if a future completes before the callback gets added.

    completed_futures = collections.deque()
    result = False
    def callback(future, completed=completed_futures):
        completed.append(future)
    
    j = 4
    
    with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as pool:
        #initial tasks
        for i in range(j):
            future = pool.submit(F(),i)
            future.add_done_callback(callback)
        while not result:    # is this the same as - while True: ?
            while not completed_futures:
                pass
            while completed_futures:
                future = completed_futures.popleft()
                result = future.result()
                if result:
                    break
                j += 1
                future = pool.submit(F(),j)
                future.add_done_callback(callback)
    print(result)