Search code examples
pythonmultithreadingprocessmultiprocessingmultiprocess

Python starting processes with separate Thread leads to exception and death of that thread


I have my main function run_tests which firstly starts new, separate Thread that starts new processes and then in main loop I try to detect those that have finished and those that have timeouted.

import time
import traceback

from typing import List
from threading import Thread
from multiprocess import (Semaphore,
                          Process,
                          Pipe)

from hanging_threads import start_monitoring


class S_Process(Process):

    def __init__(self,
                 test_name: str,
                 semaphore: Semaphore,
                 pipe_conn: Pipe,
                 *args,
                 **kwargs
                 ) -> None:

        Process.__init__(self, *args, **kwargs)
        self.__child_conn = pipe_conn
        self.__test_name = test_name
        self.__semaphore = semaphore

    def run(self) -> None:
        self.__semaphore.acquire()
        self.__child_conn.send(0)

        Process.run(self)
        self.__child_conn.send(self.__test_name)
        self.__semaphore.release()

    def terminate(self) -> None:
        self.__semaphore.release()
        super().terminate()

    @property
    def test_name(self) -> str:
        return self.__test_name


class Task(object):

    def __init__(self,
                 process: S_Process,
                 pipe_conn: Pipe,
                 ) -> None:
        self.process = process
        self.pipe_conn = pipe_conn
        self.duration = None
        self.test_name = None
        self.status = 'NOTRUN'

    def run(self) -> None:
        self.process.start()
        self.pipe_conn.recv()
        self.duration = time.perf_counter()
        self.status = 'RUNNING'

    def join(self) -> None:
        self.process.join()

        if self.process.is_alive():
            self.process.kill()
        self.set_result()

    def terminate(self) -> None:
        self.process.terminate()

    def set_result(self) -> None:
        self.test_name = self.pipe_conn.recv()
        self.status = 'ENDED'


class Tasks(object):

    def __init__(self) -> None:
        self.remaining: List[Task] = []
        self.completed: List[Task] = []

    def add(self,
            process: S_Process,
            pipe_conn: Pipe
            ) -> None:
        task = Task(process, pipe_conn)
        self.remaining.append(task)

    def complete(self, task: Task) -> None:
        self.completed.append(task)
        self.remaining.remove(task)

    def info(self) -> List[str]:
        output: List[str] = []
        for task in self.completed:
            output.append(f"Test Name: {task.result.test_name} " +
                          f"Result: {task.result.status} " +
                          f"Duration: {task.result.duration} " +
                          f"Retries: {task.result.retries}")
        return output


def run_tests() -> None:

    start_monitoring()
    tasks = Tasks()
    semaphore = Semaphore(2)

    for i in range(8):
        parent_conn, child_conn = Pipe()

        process = S_Process(
            target=test_function,
            args=(),
            test_name=f'test_{i}',
            semaphore=semaphore,
            pipe_conn=child_conn
        )
        tasks.add(process, parent_conn)

    def runner(tasks):
        try:
            for task in tasks:
                print('running task')
                task.run()

        except Exception:
            print(traceback.format_exc())

    TIMEOUT = 5

    runner = Thread(target=runner, args=(tasks.remaining,))
    runner.start()

    while tasks.remaining:
        for task in tasks.remaining:

            if not task.process.is_alive() and task.status == 'RUNNING':
                print('JOINING:', task.process.test_name)
                task.join()
                tasks.complete(task)

            if task.status == "RUNNING":
                check_time = time.perf_counter() - task.duration

                if (check_time > TIMEOUT):
                    print('TERMINATING:', task.process.test_name)
                    task.terminate()
                    tasks.complete(task)

        print('Rem:', len(tasks.remaining))
        print('End:', len(tasks.completed))
        time.sleep(0.2)


def test_function():
    print('test_func')
    time.sleep(3)


if __name__ == "__main__":
    run_tests()

The method task.run() starts the process and waits for pipe_conn.recv() to get info that process has indeed acquired semaphore and started working so I can measure its time duration.

When I set sempaphore i.e. to value "2" (max 2 processes can run simultaneously) with 7-8 tasks and start run_tests it goes well until third/fourth process is being joined/terminated. Thanks to hanging_threads package I discovered that my runner thread dies with this error:

---------- Thread 9068 "Thread-2 (runner)" hangs  ----------
        File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 973, in _bootstrap
                self._bootstrap_inner()
        File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
                self.run()
        File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
                self._target(*self._args, **self._kwargs)
        File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 261, in runner
                task.run()
        File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 64, in run
                if self.pipe_conn.recv() == 'started':
        File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 258, in recv
                buf = self._recv_bytes()
        File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 313, in _recv_bytes
                waitres = _winapi.WaitForMultipleObjects(

Why first few processes start and end well and then at some point thread is not able to handle the Pipe? Also the loop hangs on 6 tasks ended and 2 remaining and those 2 can never be started.


Solution

  • I spent quite a while on this trying to figure it out. The problem is that your task runner thread has:

                for task in tasks:
                    print('running task')
                    task.run()
    

    where tasks is a reference to tasks.remaining. The problem arises from trying to iterate what is essentially the tasks.remaining list while the main thread is removing tasks from this very same list. As a result (on my desktop) two tasks never get iterated and thus never started. The solution is for the task runner thread to iterate a copy of the tasks.remaining list.

    I have made other changes to your code. All of my modifications are commented with #Booboo. Also, I do not have a hanging_threads module and so I commented out the monitor-related code:

    import time
    import traceback
    
    from typing import List
    from threading import Thread
    from multiprocess import (Semaphore,
                              Process,
                              Pipe)
    
    
    #from hanging_threads import start_monitoring #Booboo
    
    
    class S_Process(Process):
    
        def __init__(self,
                     test_name: str,
                     semaphore: Semaphore,
                     pipe_conn: Pipe,
                     *args,
                     **kwargs
                     ) -> None:
    
            Process.__init__(self, *args, **kwargs)
            self.__child_conn = pipe_conn
            self.__test_name = test_name
            self.__semaphore = semaphore
    
        def run(self) -> None:
            with self.__semaphore: #Booboo
                """
                """
                self.__child_conn.send(0)
    
                #Booboo we must catch any possible exceptions raise by the
                # target function to ensure we do the send below:
                try:
                    Process.run(self)
                except Exception as e:
                    print(e)
                self.__child_conn.send(self.__test_name)
    
        """
        def terminate(self) -> None:
            self.__semaphore.release()
            super().terminate()
        """
    
        @property
        def test_name(self) -> str:
            return self.__test_name
    
    
    class Task(object):
    
        def __init__(self,
                     process: S_Process,
                     pipe_conn: Pipe,
                     ) -> None:
            self.process = process
            self.pipe_conn = pipe_conn
            self.duration = None
            self.test_name = None
            self.status = 'NOTRUN'
    
        def run(self) -> None:
            self.process.start()
            self.pipe_conn.recv()
            self.duration = time.perf_counter()
            self.status = 'RUNNING'
    
        def join(self) -> None:
            self.process.join()
    
            #Booboo This method is only called if the process is not alive:
            #assert not self.process.is_alive()
            """ #Booboo
            if self.process.is_alive(): # The process cannot be alive
                self.process.kill()
            """
            self.set_result()
    
        def terminate(self) -> None:
            self.process.terminate()
    
        def set_result(self) -> None:
            self.test_name = self.pipe_conn.recv()
            self.duration = time.perf_counter() - self.duration #Booboo
            self.status = 'ENDED'
    
    
    class Tasks(object):
    
        def __init__(self) -> None:
            self.remaining: List[Task] = []
            self.completed: List[Task] = []
    
        def add(self,
                process: S_Process,
                pipe_conn: Pipe,
                ) -> None:
            task = Task(process, pipe_conn)
            self.remaining.append(task)
    
        def complete(self, task: Task) -> None:
            self.completed.append(task)
            self.remaining.remove(task)
    
        def info(self) -> List[str]:
            output: List[str] = []
            for task in self.completed:
                output.append(f"Test Name: {task.test_name} " + #Booboo
                              f"Result: {task.status} " + #Booboo
                              f"Duration: {task.duration}") # + #Booboo
                              #f"Retries: {task.result.retries}") #Booboo
            return output
    
    
    def run_tests() -> None:
    
        #start_monitoring() #Booboo
        tasks = Tasks()
        semaphore = Semaphore(2)
    
        for i in range(8):
            parent_conn, child_conn = Pipe()
    
            process = S_Process(
                target=test_function,
                args=(),
                test_name=f'test_{i}',
                semaphore=semaphore,
                pipe_conn=child_conn
            )
            tasks.add(process, parent_conn)
    
        def runner(tasks):
            try:
                for task in tasks:
                    task.run()
                    print('running task', task.process.test_name) #Booboo
    
            except Exception:
                print(traceback.format_exc())
    
        TIMEOUT = 5
    
        runner = Thread(target=runner, args=(tasks.remaining.copy(),)) #Booboo
        runner.start()
    
        while tasks.remaining:
            for task in tasks.remaining.copy():
    
                if not task.process.is_alive() and task.status == 'RUNNING':
                    print('JOINING:', task.process.test_name)
                    task.join()
                    tasks.complete(task)
                elif task.status == "RUNNING": #Booboo
                    check_time = time.perf_counter() - task.duration
    
                    if (check_time > TIMEOUT):
                        print('TERMINATING:', task.process.test_name)
                        task.terminate()
                        tasks.complete(task)
    
            print('Rem:', len(tasks.remaining))
            print('End:', len(tasks.completed))
            time.sleep(0.2)
    
    
    def test_function():
        print('test_func')
        time.sleep(3)
    
    
    if __name__ == "__main__":
        run_tests()