Search code examples
pythonmultiprocessingsubprocessstdout

Real time multipocess stdout monitoring


Right now, I'm using subprocess to run a long-running job in the background. For multiple reasons (PyInstaller + AWS CLI) I can't use subprocess anymore.

Is there an easy way to achieve the same thing as below ? Running a long running python function in a multiprocess pool (or something else) and do real time processing of stdout/stderr ?

import subprocess

process = subprocess.Popen(
    ["python", "long-job.py"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    shell=True,
)

while True:
    out = process.stdout.read(2000).decode()
    if not out:
        err = process.stderr.read().decode()
    else:
        err = ""
    if (out == "" or err == "") and process.poll() is not None:
        break

    live_stdout_process(out)

Thanks


Solution

  • getting it cross platform is messy .... first of all windows implementation of non-blocking pipe is not user friendly or portable.

    one option is to just have your application read its command line arguments and conditionally execute a file, and you get to use subprocess since you will be launching yourself with different argument.

    but to keep it to multiprocessing :

    1. the output must be logged to queues instead of pipes.
    2. you need the child to execute a python file, this can be done using runpy to execute the file as __main__.
    3. this runpy function should run under a multiprocessing child, this child must first redirect its stdout and stderr in the initializer.
    4. when an error happens, your main application must catch it .... but if it is too busy reading the output it won't be able to wait for the error, so a child thread has to start the multiprocess and wait for the error.
    5. the main process has to create the queues and launch the child thread and read the output.

    putting it all together:

    import multiprocessing
    from multiprocessing import Queue
    import sys
    import concurrent.futures
    import threading
    import traceback
    import runpy
    import time
    
    class StdoutQueueWrapper:
        def __init__(self,queue:Queue):
            self._queue = queue
        def write(self,text):
            self._queue.put(text)
        def flush(self):
            pass
    
    def function_to_run():
        # runpy.run_path("long-job.py",run_name="__main__")  # run long-job.py
        print("hello")  # print something
        raise ValueError  # error out
    
    def initializer(stdout_queue: Queue,stderr_queue: Queue):
        sys.stdout = StdoutQueueWrapper(stdout_queue)
        sys.stderr = StdoutQueueWrapper(stderr_queue)
    
    def thread_function(child_stdout_queue,child_stderr_queue):
        with concurrent.futures.ProcessPoolExecutor(1, initializer=initializer,
                                                    initargs=(child_stdout_queue, child_stderr_queue)) as pool:
            result = pool.submit(function_to_run)
            try:
                result.result()
            except Exception as e:
                child_stderr_queue.put(traceback.format_exc())
    
    
    if __name__ == "__main__":
        child_stdout_queue = multiprocessing.Queue()
        child_stderr_queue = multiprocessing.Queue()
    
        child_thread = threading.Thread(target=thread_function,args=(child_stdout_queue,child_stderr_queue),daemon=True)
        child_thread.start()
    
        while True:
            while not child_stdout_queue.empty():
                var = child_stdout_queue.get()
                print(var,end='')
            while not child_stderr_queue.empty():
                var = child_stderr_queue.get()
                print(var,end='')
            if not child_thread.is_alive():
                break
            time.sleep(0.01)  # check output every 0.01 seconds
    

    Note that a direct consequence of running as a multiprocess is that if the child runs into a segmentation fault or some unrecoverable error the parent will also die, hencing running yourself under subprocess might seem a better option if segfaults are expected.