Search code examples
pythonprocessmultiprocessingsubprocesspython-multiprocessing

How to initialize parallel independent process within function?


Sorry, if headline is strange. Let me explain.

Let's say there is handler.py:

import funcs
import requests

def initialize_calculate(data):
   check_data(data)
   funcs.calculate(data) # takes a lot of time like 30 minutes
   print('Calculation launched')
   requests.get('hostname', params={'func':'calculate', 'status':'launched'})

and here is funcs.py:

import requests

def calculate(data):
   result = make_calculations(data)
   requests.get('hostname',params={'func':'calculate', 'status':'finished', 'result':result})

So what I want is that handler can initialize another function no matter where, but doesn't wait until it ends, because I want to notify client-side that process is started, and when it's done this process itself will send result when it's finished.

How can I launch independent process with function calculate from initialize_calculate?

I want to know If it's possible without non-native libraries or frameworks.


Solution

  • If you don't wan't to use a 3rd-party lib like daemonocle implementing a "well-behaved" Unix-Daemon, you could use subprocess.Popen() to create an independent process. Another option would be to modify multiprocessing.Process to prevent auto-joining of the child when the parent exits.


    subprocess.Popen()

    With subprocess.Popen() you start the new process with specifying commands and arguments like manually from terminal. This means you need to make funcs.py or another file a top-level script which parses string-arguments from stdin and then calls funcs.calculate() with these arguments.

    I boiled your example down to the essence so we don't have to read too much code.

    funcs.py

    #!/usr/bin/env python3
    # UNIX: enable executable from terminal with: chmod +x filename
    import os
    import sys
    import time
    
    import psutil  # 3rd party for demo
    
    
    def print_msg(msg):
        print(f"[{time.ctime()}, pid: {os.getpid()}] --- {msg}")
    
    
    def calculate(data, *args):
        print_msg(f"parent pid: {psutil.Process().parent().pid}, start calculate()")
        for _ in range(int(500e6)):
            pass
        print_msg(f"parent pid: {psutil.Process().parent().pid}, end calculate()")
    
    
    if __name__ == '__main__':
    
        if len(sys.argv) > 1:
            calculate(*sys.argv[1:])
    

    subp_main.py

    #!/usr/bin/env python3
    # UNIX: enable executable from terminal with: chmod +x filename
    if __name__ == '__main__':
    
        import time
        import logging
        import subprocess
        import multiprocessing as mp
    
        import funcs
    
        mp.log_to_stderr(logging.DEBUG)
    
        filename = funcs.__file__
        data = ("data", 42)
    
        # in case filename is an executable you don't need "python" before `filename`:
        subprocess.Popen(args=["python", filename, *[str(arg) for arg in data]])
        time.sleep(1)  # keep parent alive a bit longer for demo
        funcs.print_msg(f"exiting")
    

    And important for testing, run from terminal, e.g. not PyCharm-Run, because it won't show what the child prints. In the last line below you see the child process' parent-id changed to 1 because the child got adopted by systemd (Ubuntu) after the parent exited.

    $> ./subp_main.py
    [Fri Oct 23 20:14:44 2020, pid: 28650] --- parent pid: 28649, start calculate()
    [Fri Oct 23 20:14:45 2020, pid: 28649] --- exiting
    [INFO/MainProcess] process shutting down
    [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
    [DEBUG/MainProcess] running the remaining "atexit" finalizers
    $> [Fri Oct 23 20:14:54 2020, pid: 28650] --- parent pid: 1, end calculate()
    

    class OrphanProcess(multiprocessing.Process)

    If you search for something more convenient, well you can't use the high-level multiprocessing.Process as is, because it doesn't let the parent process exit before the child, as you asked for. Regular child-processes are either joined (awaited) or terminated (if you set the daemon-flag for Process) when the parent shuts down. This still happens within Python. Note that the daemon-flag doesn't make a process a Unix-Daemon. The naming is a somewhat frequent source of confusion.

    I subclassed multiprocessing.Process to switch the auto-joining off and spend some time with the source and observing if zombies might become an issue. Because the modification turns off automatic joining in the parent, I recommend using "forkserver" as start-method for new processes on Unix (always a good idea if the parent is already multi-threaded) to prevent zombie-children from sticking around as long the parent is still running. When the parent process terminates, its child-zombies get eventually reaped by systemd/init. Running multiprocessing.log_to_stderr() shows everything shutting down cleanly, so nothing seems broken so far.

    Consider this approach experimental, but it's probably a lot safer than using raw os.fork() to re-invent part of the extensive multiprocessing machinery, just to add this one feature. For error-handling in the child, write a try-except block and log to file.

    orphan.py

    import multiprocessing.util
    import multiprocessing.process as mpp
    import multiprocessing as mp
    
    __all__ = ['OrphanProcess']
    
    
    class OrphanProcess(mp.Process):
        """Process which won't be joined by parent on parent shutdown."""
        def start(self):
            super().start()
            mpp._children.discard(self)
    
        def __del__(self):
            # Finalizer won't `.join()` the child because we discarded it,
            # so here last chance to reap a possible zombie from within Python.
            # Otherwise systemd/init will reap eventually.
            self.join(0)
    

    orph_main.py

    #!/usr/bin/env python3
    # UNIX: enable executable from terminal with: chmod +x filename
    if __name__ == '__main__':
    
        import time
        import logging
        import multiprocessing as mp
        from orphan import OrphanProcess
        from funcs import print_msg, calculate
    
        mp.set_start_method("forkserver")
        mp.log_to_stderr(logging.DEBUG)
    
        p = OrphanProcess(target=calculate, args=("data", 42))
        p.start()
        time.sleep(1)
        print_msg(f"exiting")
    

    Again test from terminal to get the child print to stdout. When the shell appears to be hanging after everything was printed over the second prompt, hit enter to get a new prompt. The parent-id stays the same here because the parent, from the OS-point of view, is the forkserver-process, not the initial main-process for orph_main.py.

    $> ./orph_main.py
    [INFO/MainProcess] created temp directory /tmp/pymp-bd75vnol
    [INFO/OrphanProcess-1] child process calling self.run()
    [Fri Oct 23 21:18:29 2020, pid: 30998] --- parent pid: 30997, start calculate()
    [Fri Oct 23 21:18:30 2020, pid: 30995] --- exiting
    [INFO/MainProcess] process shutting down
    [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
    [DEBUG/MainProcess] running the remaining "atexit" finalizers
    $> [Fri Oct 23 21:18:38 2020, pid: 30998] --- parent pid: 30997, end calculate()
    [INFO/OrphanProcess-1] process shutting down
    [DEBUG/OrphanProcess-1] running all "atexit" finalizers with priority >= 0
    [DEBUG/OrphanProcess-1] running the remaining "atexit" finalizers
    [INFO/OrphanProcess-1] process exiting with exitcode 0