Search code examples
pythonpython-3.xmultiprocessingtornadopython-multiprocessing

Python ProcessPoolExecutor cannot handle exception


I am executing a Tornado server with ProcessPoolExecutor to handle multiple requests in parallel. The problem is that, in one particular case, when an exception is raised in one of the processes it doesn't propagates, but instead the process crashes with this error:

concurrent.futures.process._RemoteTraceback:
\n'''\nTraceback (most recent call last):
\n  File \"C:\\Users\\ActionICT\\anaconda3\\lib\\concurrent\\futures\\process.py\", line 367, in _queue_management_worker\n    result_item = result_reader.recv()
\n  File \"C:\\Users\\ActionICT\\anaconda3\\lib\\multiprocessing\\connection.py\", line 251, in recv
\n    return _ForkingPickler.loads(buf.getbuffer())\nTypeError: __init__() missing 1 required positional argument: 'is_local'\n'''\n\nThe above exception was the direct cause of the following exception:
\n
\nTraceback (most recent call last):\n  File \"C:\\S1\\Product\\Baseline\\PYTHON\\lab\\controller.py\", line 558, in get\n    output = exec_future.result()
\n  File \"C:\\Users\\ActionICT\\anaconda3\\lib\\concurrent\\futures\\_base.py\", line 428, in result\n    return self.__get_result()\n  File \"C:\\Users\\ActionICT\\anaconda3\\lib\\concurrent\\futures\\_base.py\", line 384, in __get_result
\n    raise self._exception\nconcurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.\n

I have tried it in debugger, and found that the problem is executing this

    def _send_bytes(self, buf):
        ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
        try:
            if err == _winapi.ERROR_IO_PENDING:
                waitres = _winapi.WaitForMultipleObjects(
                    [ov.event], False, INFINITE)
                assert waitres == WAIT_OBJECT_0
        except:
            ov.cancel()
            raise
        finally:
            nwritten, err = ov.GetOverlappedResult(True)
        assert err == 0
        assert nwritten == len(buf)

This is called when the process tries to propagate the exception to the corresponding Future object. In the first line, when calling _winapi.WriteFile, everything crashes in debugger, and I can't understand why. Any idea?


Solution

  • I have resolved with a workaround: I have wrapped internally the function inside separate process in try except, then copied the old exception in a new exception and raised it. I don't know why... but it works.

    def _execute_tuning(tune_parameters: TuneParameters):
    # function to parallelize  todo to be refactored
    
    # execute scenario, then write result or error in output
    
    try:
        config.generate_project_config(
            project_name=tune_parameters.project_name,
            scenario_name=tune_parameters.scenario_name
        )
    
        config.generate_session_log_config(project_name=tune_parameters.project_name,
                                                        scenario_name=tune_parameters.scenario_name)
    
        tree = DecisionTreeGenerator(tune_parameters.project_name, tune_parameters.scenario_name)
    
        tree.fit(
            # todo refactor
            auto_tune=True if tune_parameters == 'true' else False,
            max_depth=tune_parameters.max_depth,
            columns=tune_parameters.columns,
            min_samples_leaf=tune_parameters.min_samples_per_leaf,
            max_leaf_nodes=tune_parameters.max_leaf_nodes
        )
    
        kpi = KPICalculator(tune_parameters.project_name, tune_parameters.scenario_name)
        kpi.run(do_optimization_kpi=False)
    except Exception as exc:
        Loggers.application.exception(exc)
        exc_final = Exception(str(exc))
        exc_final.__traceback__ = exc.__traceback__
        raise exc_final