Search code examples
pythonparallel-processingmultiprocessingsubprocessmessage-queue

How to send a multiprocessing queue to a subprocess


I wish to send the multiprocessing queue object to a subprocess, because what I actually want to do is I have a scenario where controller invokes 2 or more subprocesses (all of them are python scripts) I want to insure fault tolerance so in case any subprocess exits for any reason the controller must relaunch it, for that reason I cannot use process.communicate() because if I have two processes the controller can only wait on one invoked process at a time through the blocking call mentioned above which means if the other process exits it might be too late before the first process completes and the controller moves onto the next process.communicate() to find out that, that process has exited long ago.
I was thinking of using multithreading queues to pass on to all the subprocesses and if any process is exiting It must write it's name and exit code in the queue and controller can get it and relaunch that specific process again via this approach on one blocking call I can wait for responses from all the spawned processes.
I know I can use multiprocessing.process to create a new process but I don't want that because it forks which means it will also fork all resources of controller (including thread objects)
Upon researching I have found out that multiprocessing.queues are picklable which means there must be a way I can send them.
Is there a way of using multiprocessing Queues or should I be using something else.
Thank you.

Currently I am trying to send the queue like this:

import os
import time 
import subprocess 
import multiprocessing

if __name__ == "__main__":
    m = multiprocessing.Manager()
    q = m.Queue()
    env = os.environ.copy()
    process = subprocess.Popen(['python3',
                            f'multi_child.py',
                            q, 
                            ], env=env,
                            stderr=subprocess.PIPE)
    time.sleep(1) 
    

and receiving in multi_child.py using sys.argv[1]


Solution

  • Technically speaking, you are not using a multiprocessing.Queue instance, but rather a managed queue. This is implemented as a queue.Queue instance that resides in a process that is created when you call the multiprocessing.Manager() method. The subsequent call q.manager.Queue() returns a reference not to this queue but rather a reference to special queue proxy. When calls are made on this proxy instance, the method name and arguments are serialized/deserialized over to the manager process where the actual queue instance is acted upon.

    There are a couple of issues with your current code. First, you cannot readily pass this proxy reference as a command line argument. Even if you could, I doubt it would be meaningful in the new process created with Popen.

    Assuming you have control over the source code of multichild.py the first suggestion I would make is to take any code that is at global scope other than import statements, class definitions, method definitions, global variable definitions, etc., i.e. Python statements at global scope that are executed when the script is run and place then in a function named main (or any name you choose). If your script already looks like ...

    ...
    def main():
        ... # statements
    
    if __name__ == '__main__':
        main()
    

    ... then there is nothing you need to do this source. Otherwise, reorganize your code so that when the script is invoked it just executes function main (or your chosen function name). Then you can modify your original source to use an actual multiprocessing.Queue instance, which offers better performance than a managed queue:

    import time 
    import multiprocessing
    
    from multi_child_1 import main as main1
    from multi_child_2 import main as main2
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        p1 = Process(target=main1, args=(q,))
        p1.start()
        # Start other processes, for example:
        p2 = Process(target=main2, args=(q,))
        p2.start()
        time.sleep(1) # What is this for?
    
        # Wait for processes to complete:
        p1.join()
        p2.join()
    

    If for some reason you cannot do this, then you need to create a non-standard managed queue such that when the manager is asked to create one it always returns a proxy to a singleton queue. This is an example of how your child process file would look:

    File multi_child.py

    from multiprocessing.managers import BaseManager
    from multiprocessing import current_process
    import time
    
    address = "127.0.0.1"
    port = 50000
    password = "secret"
    
    def connect_to_manager():
        BaseManager.register('sharable_queue')
        manager = BaseManager(address=(address, port), authkey=password.encode('utf-8'))
        manager.connect()
        return manager.sharable_queue()
    
    if __name__ == '__main__':
        sharable_queue = connect_to_manager()
        while True:
            n = sharable_queue.get()
            if n is None:
                # Senitnel telling us no more data
                break
            print(f'Process {current_process().pid} got {n}')
            time.sleep(1)
    

    And your main script would look like:

    File main.py

    from multiprocessing.managers import BaseManager
    from queue import Queue
    from threading import Thread, Event
    from main_child import address, port, password, connect_to_manager
    from subprocess import Popen
    
    the_queue = None
    
    def get_queue():
        """Return a singleton queue."""
        global the_queue
    
        if the_queue is None:
            the_queue = Queue()
        return the_queue
    
    def server(started_event, shutdown_event):
        net_manager = BaseManager(address=(address, port), authkey=password.encode('utf-8'))
        BaseManager.register('sharable_queue', get_queue)
        net_manager.start()
        started_event.set() # tell main thread that we have started
        shutdown_event.wait() # wait to be told to shutdown
        net_manager.shutdown()
    
    def main():
        started_event = Event()
        shutdown_event = Event()
        # Run the manager in a thread, so that we can continue:
        server_thread = Thread(target=server, args=(started_event, shutdown_event,))
        server_thread.start()
    
        # wait for manager to start:
        started_event.wait()
    
        sharable_queue = connect_to_manager()
        print(dir(sharable_queue), sharable_queue._address_to_local)
        sharable_queue.put(1)
        sharable_queue.put(2)
        # Put two sentinels since we are starting two processes:
        sharable_queue.put(None)
        sharable_queue.put(None)
    
        # Here we are invoking main_child.py twice:
        processes = [Popen(['python', 'main_child.py']) for _ in range(2)]
        for process in processes:
            process.communicate()
    
        # tell manager we are through:
        shutdown_event.set()
        server_thread.join()
    
    if __name__ == '__main__':
        main()
    

    Prints:

    Process 17500 got 1
    Process 23512 got 2