Search code examples
pythonmultiprocessingpython-multiprocessing

Join timeout in multiprocessing


I have a dummy example, I want to apply multiprocessing in it. Consider a scenario where you have a stream of numbers(which I call frame) incoming one by one. And I want to assign it to any single process that is available currently. So I am creating 4 processes that are running a while loop, seeing if any element in queue, than apply function on it.

The problem is that when I join it, it gets stuck in any while loop, even though I close the while loop before it. But somehow it gets stuck inside it.

Code:

# step 1, 4 processes
import multiprocessing as mp
import os
import time

class MpListOperations:
    def __init__(self):
        self.results_queue = mp.Manager().Queue()
        self.frames_queue = mp.Manager().Queue()
        self.flag = mp.Manager().Value(typecode='b',value=True)
        self.list_nums = list(range(0,5000))


    def process_list(self):
        print(f"Process id {os.getpid()} started")
        while self.flag.value:
#             print(self.flag.value)
            if self.frames_queue.qsize():
                self.results_queue.put(self.frames_queue.get()**2)


    def create_processes(self, no_of_processes = mp.cpu_count()):
        print("Creating Processes")
        self.processes = [mp.Process(target=self.process_list) for _ in range(no_of_processes)]

    def start_processes(self):
        print(f"starting processes")
        for process in self.processes:
            process.start()

    def join_process(self):
        print("Joining Processes")
        while True:
            if not self.frames_queue.qsize():
                self.flag.value=False
                print("JOININNG HERE")
                for process in self.processes:
                    exit_code = process.join()
                    print(exit_code)
                print("BREAKING DONE")
                break

    def stream_frames(self):
        print("Streaming Frames")
        for frame in self.list_nums:
            self.frames_queue.put(frame)


if __name__=="__main__":
    start = time.time()
    mp_ops = MpListOperations()
    mp_ops.create_processes()
    mp_ops.start_processes()
    mp_ops.stream_frames()
    mp_ops.join_process()
    print(time.time()-start)

Now if I add a timeout parameter in join, even 0, i.e exit_code = process.join(0) it works. I want to understand in this scenario, if this code is correct, what should be the value of timeout? Why is it working with timeout and not without it? What is the proper way to implement multiprocessing with it?


Solution

  • If you look at the documentation for a managed queue you will see that the qsize method only returns an approximate size. I would therefore not use it for testing when all the items have been taken of the frames queue. Presumably you want to let the processes run until all frames have been processed. The simplest way I know would be to put N sentinel items on the frames queue after the actual frames have been put where N is the number of processes getting from the queue. A sentinel item is a special value that cannot be mistaken for an actual frame and signals to the process that there are no more items for it to get from the queue (i.e. a quasi end-of-file item). In this case we can use None as the sentinel items. Each process then just continues to do get operations on the queue until it sees a sentinel item and then terminates. There is therefore no need for the self.flag attribute.

    Here is the updated and simplified code. I have made some other minor changes that have been commented:

    import multiprocessing as mp
    import os
    import time
    
    class MpListOperations:
        def __init__(self):
            # Only create one manager process:
            manager = mp.Manager()
            self.results_queue = manager.Queue()
            self.frames_queue = manager.Queue()
            # No need to convert range to a list:
            self.list_nums = range(0, 5000)
    
    
        def process_list(self):
            print(f"Process id {os.getpid()} started")
            while True:
                frame = self.frames_queue.get()
                if frame is None: # Sentinel?
                    # Yes, we are done:
                    break
                self.results_queue.put(frame ** 2)
    
    
        def create_processes(self, no_of_processes = mp.cpu_count()):
            print("Creating Processes")
            self.no_of_processes = no_of_processes
            self.processes = [mp.Process(target=self.process_list) for _ in range(no_of_processes)]
    
        def start_processes(self):
            print("Starting Processes")
            for process in self.processes:
                process.start()
    
        def join_processes(self):
            print("Joining Processes")
            for process in self.processes:
                # join returns None:
                process.join()
    
        def stream_frames(self):
            print("Streaming Frames")
            for frame in self.list_nums:
                self.frames_queue.put(frame)
            # Put sentinels:
            for _ in range(self.no_of_processes):
                self.frames_queue.put(None)
    
    
    if __name__== "__main__":
        start = time.time()
        mp_ops = MpListOperations()
        mp_ops.create_processes()
        mp_ops.start_processes()
        mp_ops.stream_frames()
        mp_ops.join_processes()
        print(time.time()-start)
    

    Prints:

    Creating Processes
    Starting Processes
    Process id 28 started
    Process id 29 started
    Streaming Frames
    Process id 33 started
    Process id 31 started
    Process id 38 started
    Process id 44 started
    Process id 42 started
    Process id 45 started
    Joining Processes
    2.3660173416137695
    

    Note for Windows

    I have modified method start_processes to temporarily set attribute self.processes to None:

        def start_processes(self):
            print("Starting Processes")
            processes = self.processes
            # Don't try to pickle list of processes:
            self.processes = None
            for process in processes:
                process.start()
            # Restore attribute:
            self.processes = processes
    

    Otherwise under Windows we get a pickle error trying to serialize/deserialize a list of processes containing two or more multiprocessing.Process instances. The error is "TypeError: cannot pickle 'weakref' object." This can be demonstrated with the following code where we first try to pickle a list of 1 process and then a list of 2 processes:

    import multiprocessing as mp
    import os
    
    class Foo:
        def __init__(self, number_of_processes):
            self.processes = [mp.Process(target=self.worker) for _ in range(number_of_processes)]
            self.start_processes()
            self.join_processes()
    
        def start_processes(self):
            processes = self.processes
            for process in self.processes:
                process.start()
    
        def join_processes(self):
            for process in self.processes:
                process.join()
    
        def worker(self):
            print(f"Process id {os.getpid()} started")
            print(f"Process id {os.getpid()} ended")
    
    
    if __name__== "__main__":
        foo = Foo(1)
        foo = Foo(2)
    

    Prints:

    Process id 7540 started
    Process id 7540 ended
    Traceback (most recent call last):
      File "C:\Booboo\test\test.py", line 26, in <module>
        foo = Foo(2)
      File "C:\Booboo\test\test.py", line 7, in __init__
        self.start_processes()
      File "C:\Booboo\test\test.py", line 13, in start_processes
        process.start()
      File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
        self._popen = self._Popen(self)
      File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
        return _default_context.get_context().Process._Popen(process_obj)
      File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
        return Popen(process_obj)
      File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
        reduction.dump(process_obj, to_child)
      File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
        ForkingPickler(file, protocol).dump(obj)
    TypeError: cannot pickle 'weakref' object
    Process id 18152 started
    Process id 18152 ended
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
        exitcode = _main(fd, parent_sentinel)
      File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _main
        self = reduction.pickle.load(from_parent)
    EOFError: Ran out of input