Search code examples
pythondictionaryconcurrencymultiprocessingthreadpoolexecutor

get error using ThreadPoolExecutor with map


I have noticed that my code does not output an error or break when there is an error inside function(x) when it is called by ThreadPoolExecutor():

import concurrent.futures

def function(x):
   # do sth invalid such as
   x = y + 1 #there is no y variable

input_list = [1,2,3]
with concurrent.futures.ThreadPoolExecutor() as executor: 
    executor.map(function,input_list)  

How can I get the error output using map() instead of submit() for the executor?


Solution

  • executor.map returns an iterable that must be iterated to get individual results, including any exception that might have been thrown:

    import concurrent.futures
    
    def function(x):
        # do sth invalid such as
        if x == 2:
            raise ValueError("I don't like 2")
        return x, x ** 2
    
    input_list = [1,2,3]
    with concurrent.futures.ThreadPoolExecutor(3) as executor:
        results = executor.map(function,input_list)
        try:
            for x, return_value in results:
                print(f'{x} ** 2 = {return_value}')
        except Exception as e:
            print(e)
    

    Prints:

    1 ** 2 = 1
    I don't like 2
    

    Or you can iterate slightly differently and get the same results with:

    import concurrent.futures
    
    def function(x):
        # do sth invalid such as
        if x == 2:
            raise ValueError("I don't like 2")
        return x, x ** 2
    
    input_list = [1,2,3]
    with concurrent.futures.ThreadPoolExecutor(3) as executor:
        results = executor.map(function,input_list)
        while True:
            try:
                x, return_value = next(results)
            except StopIteration:
                break
            except Exception as e:
                print(e)
            else:
                print(f'{x} ** 2 = {return_value}')
    

    However, if you use class ThreadPool from package multiprocessing.pool, then you can get all the results, that is get results beyond any exception that may have been thrown (using imap):

    from multiprocessing.pool import ThreadPool
    
    def function(x):
        # do sth invalid such as
        if x == 2:
            raise ValueError("I don't like 2")
        return x, x ** 2
    
    input_list = [1,2,3]
    with ThreadPool(3) as executor:
        results = executor.imap(function,input_list)
        while True:
            try:
                x, return_value = next(results)
            except StopIteration:
                break
            except Exception as e:
                print(e)
            else:
                print(f'{x} ** 2 = {return_value}')
    

    Prints:

    1 ** 2 = 1
    I don't like 2
    3 ** 2 = 9