Search code examples
pythonpython-asyncio

Asyncio multiprocessing communication with queues - only one coroutine running


I have a manager script that's launching some processes, then using two coroutines (one to monitor, one to gather results). For some reason only one coroutine seems to be run, what am I missing? (I don't work with asyncio)

enter image description here

import multiprocessing as mp
import time 
import asyncio
import logging 

logging.basicConfig(level=logging.DEBUG)

class Process(mp.Process):
    def __init__(self, task_queue: mp.Queue, result_queue: mp.Queue):
        super().__init__()
        self.task_queue = task_queue
        self.result_queue = result_queue
        logging.info('Process init')

    def run(self):
        while not self.task_queue.empty():
            try:
                task = self.task_queue.get(timeout=1)
            except mp.Queue.Empty:
                logging.info('Task queue is empty')
                break
            
            time.sleep(1)
            logging.info('Processing task %i (pid %i)', task, self.pid)
            self.result_queue.put(task)
            
        logging.info('Process run')

class Manager:
    def __init__(self):
        self.processes = []
        self.task_queue = mp.Queue()
        self.result_queue = mp.Queue()
        self.keep_running = True

    async def monitor(self):
        while self.keep_running:
            await asyncio.sleep(0.1)
            logging.info('Task queue size: %i', self.task_queue.qsize())
            logging.info('Result queue size: %i', self.result_queue.qsize())
            self.keep_running = any([p.is_alive() for p in self.processes])


    async def consume_results(self):
        while self.keep_running:
            try:
                result = self.result_queue.get()
            except mp.Queue.Empty:
                logging.info('Result queue is empty')
                continue

            logging.info('Got result: %s', result)

    def start(self):
        # Populate the task queue
        for i in range(10):
            self.task_queue.put(i)

        # Start the processes
        for i in range(3):
            p = Process(self.task_queue, self.result_queue)
            p.start()
            self.processes.append(p)

        # Wait for the processes to finish
        loop = asyncio.get_event_loop()
        loop.create_task(self.monitor())
        loop.create_task(self.consume_results())

manager = Manager()
manager.start()
  • expecting to see the monitor queue sizes, however only the consume_results() is run

Solution

  • Here I'm assuming that you monitor queue sizes, and that you didn't mention env you are in, I'm assuming it's latest 3.12.x.


    Issues


    • You Block the asyncio thread by .get() call.
        async def monitor(self):
            while self.keep_running:
                try:
                    result = self.result_queue.get()
    


    • there's no multiprocessing.Queue.Empty.
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "...\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
        self.run()
      File "E:...\StackOverflow\78475657\78475657.py", line 20, in run
        except mp.Queue.Empty:
               ^^^^^^^^^^^^^^
    AttributeError: 'function' object has no attribute 'Empty'
    


    RuntimeError: 
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.
    
            This probably means that you are not using fork to start your
            child processes and you have forgotten to use the proper idiom
            in the main module:
    
                if __name__ == '__main__':
                    freeze_support()
                    ...
    
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce an executable.
    
            To fix this issue, refer to the "Safe importing of main module"
            section in https://docs.python.org/3/library/multiprocessing.html     
    


    • you Didn't wait for the process. Your main process finished thus throw KeyboardInterrupt on all child processes to halt the program.
    Process Process-3:
    Traceback (most recent call last):
    Traceback (most recent call last):
    Traceback (most recent call last):
      File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
        self.run()
      File "...\StackOverflow\78475657\78475657.py", line 24, in run
        time.sleep(1)
    KeyboardInterrupt
      File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
        self.run()
      File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
        self.run()
      File "...\StackOverflow\78475657\78475657.py", line 24, in run
        time.sleep(1)
      File "...\StackOverflow\78475657\78475657.py", line 24, in run
        time.sleep(1)
    KeyboardInterrupt
    KeyboardInterrupt
    Exception ignored in atexit callback: <function _exit_function at 0x000001C044853CE0>
    Traceback (most recent call last):
      File "...\Python\Python312\Lib\multiprocessing\util.py", line 357, in _exit_function
        p.join()
      File "...\Python\Python312\Lib\multiprocessing\process.py", line 149, in join
        res = self._popen.wait(timeout)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
      File "...\Python\Python312\Lib\multiprocessing\popen_spawn_win32.py", line 110, in wait
        res = _winapi.WaitForSingleObject(int(self._handle), msecs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    KeyboardInterrupt: 
    


    • you Didn't wait for the loop. Hence every tasks just immediately got cancelled via KeyboardInterrupt.
    # Wait for the processes to finish
    loop = asyncio.get_event_loop()
    loop.create_task(self.monitor())
    loop.create_task(self.consume_results())
    
    ERROR:asyncio:Task was destroyed but it is pending!
    task: <Task pending name='Task-1' coro=<Manager.monitor() running at ...\StackOverflow\78475657\78475657.py:39>>
    ...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.monitor' was never awaited
      self._ready.clear()
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    ERROR:asyncio:Task was destroyed but it is pending!
    task: <Task pending name='Task-2' coro=<Manager.consume_results() running at ...\StackOverflow\78475657\78475657.py:46>>
    ...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.consume_results' was never awaited
      self._ready.clear()
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    


    • asyncio.get_event_loop() Deprecation warning.

    Deprecated since version 3.12: Deprecation warning is emitted if there is no current event loop. In some future Python release this will become an error.

    ...\StackOverflow\78475657\78475657.py:67: DeprecationWarning: There is no current event loop
      loop = asyncio.get_event_loop()
    

    However, since in notebook is already in Async Context and has loop, it's opposite in there and what you did is correct.



    • Some check in loop checking if queue is empty is redundant.
            while not self.task_queue.empty():  # < no use due to race condition
                try:
                    task = self.task_queue.get(timeout=1)  # actual important check
                except mp.Queue.Empty:
                    logging.info('Task queue is empty')
                    break
    


    Fixes

    • Use asyncio.to_thread to push blocking call to thread.

    • Use queue.Empty

    import queue
    
    try:
        ...
    except queue.Empty:
        ...
    

    • add Process.join or better yet use mp.Pool.
    # from https://docs.python.org/3/library/multiprocessing.html
    from multiprocessing import Process
    import os
    
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
    
    def f(name):
        info('function f')
        print('hello', name)
    
    if __name__ == '__main__':
        info('main line')
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

    • wait for tasks.
    import asyncio
    
    
    async def some_task():
        print("Start")
        await asyncio.sleep(10)
        print("End")
    
    
    def unrecommended_old_way():
        loop = asyncio.new_event_loop()
        task_1 = loop.create_task(some_task())
        task_2 = loop.create_task(some_task())
    
        print("Task spawn done")
        loop.run_until_complete(asyncio.gather(task_1, task_2))
        print("Tasks all done")
    
    
    unrecommended_old_way()
    
    Task spawn done
    Start
    Start
    End
    End
    Tasks all done
    

    • or use better, recommended way to do above
    import asyncio
    
    
    async def some_task():
        print("Start")
        await asyncio.sleep(10)
        print("End")
    
    
    async def recommended_way():
        task_1 = asyncio.create_task(some_task())
        task_2 = asyncio.create_task(some_task())
    
        print("Task spawn done")
        await asyncio.gather(task_1, task_2)
        print("Tasks all done")
    
    
    asyncio.run(recommended_way())
    




    Actual fix

    Originally I'd not recommend mixing concurrency frameworks, but after knowing notebook is using asyncio by default - I think it kinda makes sense why you was trying it. Sorry for misunderstanding!

    Here's what you probably wanted, mixing asyncio and multiprocessing.

    other_file.py:

    import multiprocessing as mp
    import logging
    import queue
    import time
    
    
    class WorkerProcess(mp.Process):
        def __init__(self, task_queue: mp.JoinableQueue, result_queue: mp.Queue, timeout_sec=2):
            super().__init__()
    
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.timeout = timeout_sec
    
        def run(self):
            logging.info(f"[{self.pid:10}] Started")
    
            while True:
                try:
                    task_id, task = self.task_queue.get(timeout=self.timeout)
                except queue.Empty:
                    break
    
                try:
                    logging.info(f"[{self.pid:10}] Processing {task_id}")
                    time.sleep(1)
                    logging.info(f"[{self.pid:10}] Processing {task_id} done")
    
                    self.result_queue.put(task)
                finally:
                    # signal JoinableQueue that task we took is done
                    self.task_queue.task_done()
    
            logging.info(f"[{self.pid:10}] Done")
    


    in other file or notebook:

    import multiprocessing as mp
    import asyncio
    import logging
    
    from other_file import WorkerProcess
    
    logging.root.setLevel(logging.NOTSET)
    
    
    class Manager:
        def __init__(self):
            self.task_queue = mp.JoinableQueue()
            self.result_queue = mp.Queue()
    
        async def _monitor(self):
            """Periodically reports queue sizes.
            Automatically cancelled after all processes are killed."""
    
            while True:
                await asyncio.sleep(0.5)
    
                logging.info(f"[Manager]   Task queue size: {self.task_queue.qsize()}")
                logging.info(f"[Manager] Result queue size: {self.result_queue.qsize()}")
    
        async def _process(self):
            """Do whatever you want here for results"""
    
            while (val := await asyncio.to_thread(self.result_queue.get)) is not None:
                logging.info(f"[Manager] Got result {val}")
    
        async def start(self):
            """Wait for all tasks to be done"""
    
            # Populate the task queue
            for task_idx, task in enumerate(range(10)):
                self.task_queue.put((task_idx, task))
    
            # create loop, monitoring task & result process task
            monitor_task = asyncio.create_task(self._monitor())
            result_proc_task = asyncio.create_task(self._process())
    
            # Start the processes
            processes = [
                WorkerProcess(self.task_queue, self.result_queue)
                for _ in range(3)
            ]
            for process in processes:
                process.start()
    
            logging.info("[Manager] Started")
    
            # wait for all tasks to be done. If so, cancel monitor task and wait for it to end
            await asyncio.to_thread(self.task_queue.join)
            monitor_task.cancel()
    
            # since .task_done() emit is AFTER putting to result queue,
            # there's no race condition here. Send sentinel to result queue
            # to signal end of process.
            self.result_queue.put(None)
    
            logging.info("[Manager] Done")
    
    
    manager = Manager()
    
    # if running normally:
    # asyncio.run(manager.start())
    
    # in notebook (already in async context)
    await manager.start()
    

    Normal run log:

    INFO:root:[   Manager] Started
    INFO:root:[     22356] Started
    INFO:root:[     22356] Processing 0
    INFO:root:[     14468] Started
    INFO:root:[     14468] Processing 1
    INFO:root:[     21076] Started
    INFO:root:[     21076] Processing 2
    INFO:root:[   Manager]   Task queue size: 7
    INFO:root:[   Manager] Result queue size: 0
    INFO:root:[     22356] Processing 0 done
    INFO:root:[     14468] Processing 1 done
    INFO:root:[     22356] Processing 3
    INFO:root:[   Manager] Got result 0
    INFO:root:[     14468] Processing 4
    INFO:root:[   Manager] Got result 1
    INFO:root:[     21076] Processing 2 done
    INFO:root:[     21076] Processing 5
    INFO:root:[   Manager] Got result 2
    INFO:root:[   Manager]   Task queue size: 4
    INFO:root:[   Manager] Result queue size: 0
    INFO:root:[     22356] Processing 3 done
    INFO:root:[     22356] Processing 6
    INFO:root:[   Manager] Got result 3
    INFO:root:[     14468] Processing 4 done
    INFO:root:[     14468] Processing 7
    INFO:root:[   Manager] Got result 4
    INFO:root:[     21076] Processing 5 done
    INFO:root:[     21076] Processing 8
    INFO:root:[   Manager] Got result 5
    INFO:root:[   Manager]   Task queue size: 1
    INFO:root:[   Manager] Result queue size: 0
    INFO:root:[     22356] Processing 6 done
    INFO:root:[     22356] Processing 9
    INFO:root:[   Manager] Got result 6
    INFO:root:[     14468] Processing 7 done
    INFO:root:[   Manager] Got result 7
    INFO:root:[     21076] Processing 8 done
    INFO:root:[   Manager] Got result 8
    INFO:root:[   Manager]   Task queue size: 0
    INFO:root:[   Manager] Result queue size: 0
    INFO:root:[     22356] Processing 9 done
    INFO:root:[   Manager] Got result 9
    INFO:root:[     21076] Done
    INFO:root:[     14468] Done
    INFO:root:[     22356] Done
    INFO:root:[   Manager] Done
    


    Notebook will have no such process logs as each process's outputting it's own stdout stderr files, which isn't a notebook but is console.

    Hence output is instead on console (if notebook's running on localhost, not collab)

    enter image description here

    Terminal:

    INFO:root:[     22228] Started
    INFO:root:[     22228] Processing 0
    INFO:root:[     21712] Started
    INFO:root:[     21712] Processing 1
    INFO:root:[     20724] Started
    INFO:root:[     20724] Processing 2
    INFO:root:[     21712] Processing 1 done
    INFO:root:[     22228] Processing 0 done
    INFO:root:[     21712] Processing 3
    INFO:root:[     22228] Processing 4
    INFO:root:[     20724] Processing 2 done
    INFO:root:[     20724] Processing 5
    INFO:root:[     21712] Processing 3 done
    INFO:root:[     22228] Processing 4 done
    INFO:root:[     21712] Processing 6
    INFO:root:[     22228] Processing 7
    INFO:root:[     20724] Processing 5 done
    INFO:root:[     20724] Processing 8
    INFO:root:[     21712] Processing 6 done
    INFO:root:[     22228] Processing 7 done
    INFO:root:[     21712] Processing 9
    INFO:root:[     20724] Processing 8 done
    INFO:root:[     21712] Processing 9 done
    INFO:root:[     22228] Done
    INFO:root:[     20724] Done
    INFO:root:[     21712] Done
    [I 2024-05-15 16:43:24.204 ServerApp] Saving file at /Untitled1.ipynb