Search code examples
python-3.xconcurrent.futures

How concurrent.futures.ThreadPoolExecutor() would work if one thread has exit


I'm trying to understand how concurrent.futures.ThreadPoolExecutor() would work. I have piece of code blow code. If it has exit in one of the thread, it's exiting near futures for loop and not moving further step.

import concurrent.futures
import time

_list = ['xy','z2' ,'a',' ']

def main(s):
    if 'a' in s:
        exit(0)
    print('Processed for', s)

with concurrent.futures.ThreadPoolExecutor() as process: 
    futures = process.map(main,[elem.strip() for elem in _list if not elem.isspace()])

for r in futures:
    if r: print("Exception: %s", r)
print("Processed for all list.")

Execution output:

 % python3 test.py
Processed for xy
Processed for z2

Is due to one of thread exiting and we cant move forward with other stuff?


Solution

  • When you call:

    futures = process.map(main,[elem.strip() for elem in _list if not elem.isspace()])
    

    You passing in the second argument to process.map a list of three values:

    ['xy', 'z2', 'a']
    

    Your Future task only prints out a value if the input argument is not a:

    def main(s):
        if "a" in s:
            exit(0)
        print("Processed for", s)
    

    You only see Processed for ... messages for the first two items because when the argument is a, the method exits without printing anything. If you were to rewrite it like this:

    def main(s):
        print("Processed for", s)
        if "a" in s:
            exit(0)
    

    Then you would see output for all of the items in the list:

    Processed for xy
    Processed for z2
    Processed for a
    

    Sorry may be I couldn't convey my ask. Here what I’m missing is for r in futures: ...

    The return value of process.map(...) is an iterator that will yield return values from your tasks. Since your main function doesn't return a value, the value of r inside the loop will always be None. If we were to rewrite your code like this so that main returns a value:

    import concurrent.futures
    import time
    
    _list = ['xy','z2' ,'a',' ']
    
    def main(s):
        print('Processed for', s)
        if 'a' in s:
            exit(0)
        return s
    
    with concurrent.futures.ThreadPoolExecutor() as process: 
        futures = process.map(main,[elem.strip() for elem in _list if not elem.isspace()])
    
    for r in futures:
        if r: print("Exception: %s" % r)
    print("Processed for all list.")
    

    Then the output would be:

    Processed for xy
    Processed for z2
    Processed for a
    Exception: xy
    Exception: z2
    

    Of course, the label Exception is erroneous -- those aren't exceptions; those are just the return values from your main function. We don't see a return value for main('a') because in that case you call exit, rather than returning a value.