Search code examples
pythonmultithreadingexception

Pool.starmap exception handling in python


The following code is only handling first exception while carrying on with rest of the code.

The problem is solved when we add try except to func.

What is the reason for the same ?

from multiprocessing.pool import ThreadPool

class SQLQueryError(Exception):
  def init(self, message):
    self.message = message
    super().init(f"SQL query error: {message}")

def solve(args):
  with ThreadPool(len(args)) as pool:
    try:
      return pool.starmap(func,args)
    except Exception as e:
      print(e)

def func(a, b):
  if(a==“12” or a==“23” or b == “Work”):
    raise SQLQueryError( f"Table does not exist {a} and {b}")

li = [(“12”, “Hello”), (“23”, “World”), (“12”, “Hospital”),(“7”,“Work”)]
solve(li)

Output: - SQL query error: Table does not exist 12 and Hello

Expecting all the errors but only receiving first one.


Solution

  • You won't be able to use pool.starmap here or pool.map, pool.map_async or pool.starmap_async. The original multiprocessing library specifically only retains the first exception when using map functions.

    multiprocessing.pool.MapResult._set

    if not success and self._success:
        # only store first exception 
    

    Consider making a list of AsyncResult objects by calling pool.apply_async in a loop, or use concurrent.futures.

    # Example with apply_async
    def solve(args):
        with ThreadPool(len(args)) as pool:
            futures = [pool.apply_async(func, arg) for arg in args]
            results = []
            for future in futures:
                try:
                    value = future.get()
                    results.append(value)
                except Exception as e:
                    print(e)
    
    # Example with concurrent.futures
    import concurrent.futures
    def solve(args):
        with concurrent.futures.ThreadPoolExecutor(len(args)) as pool:
            futures = [pool.submit(func, *arg) for arg in args]  # submit expects unpacked args
            results = []
            for future in futures:
                try:
                    value = future.result()
                    results.append(value)
                except Exception as e:
                    print(e)
    

    The reason handling the exception inside func works is that the main process never knows the child process raised an exception, so the logic causing it to only store the first exception is not triggered. func in that instance simply returns None, which is a valid return value.