Search code examples
python-3.xpython-asynciopython-multiprocessing

Multiprocessing backend blocking asyncio frontend


in a general GUI-based script (framework) I have a Tkinter-based GUI. This is run asynchronously (works). When I press the 'Start' button, a processing starts and as it's CPU-heavy, this is done using multiprocessing (works). What doesn't work is the back-reporting of the processes (done this, done that) to the GUI in order to display the progress.

In order the deliver the messages from processes, I use a multiprocessing.Queue. Since the async GUI cannot be fed from this Queue, I use an asyncio.Queue to fed the GUI, and I have the mp_queue_to_async_queue() function to pick out the messages from mp_queue and put them to the async_queue. In theory everything works with the exception that the mp_queue_to_async_queue() row blocks the following print_out_async() row.

Here is a 'dummy' code that depicts the problem:

import multiprocessing
import os
import time
import asyncio
import random

N_PROCESSES = 2
N_ITER = 10
N_SEC = 1

async_queue = asyncio.Queue()

def worker_main(p_queue):
    print (_pid:=os.getpid(),"working")
    for i in range(N_ITER):
        some_random_time = N_SEC * random.random()
        p_queue.put(f"{i} - {_pid}: {some_random_time} sec")
        time.sleep(some_random_time)
    p_queue.put(None)

async def run():
    await mp_queue_to_async_queue() #This row is blocking
    await print_out_async()

async def mp_queue_to_async_queue():
    processes_finished = 0
    while True:
        message = mp_queue.get()
        print(f"-> {message}")
        await async_queue.put(message)
        if message == None:
            processes_finished += 1
        if processes_finished == N_PROCESSES:
            break

async def print_out_async():
    processes_finished = 0
    while True:
        b = await async_queue.get()
        print(f"<- {b}")
        if b == None:
            processes_finished += 1
        if processes_finished == N_PROCESSES:
            break

if __name__ == '__main__':
    mp_queue = multiprocessing.Queue()
    pool = multiprocessing.Pool(processes=N_PROCESSES, initializer=worker_main, initargs=(mp_queue,))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

    pool.terminate()

Solution

  • Your villain is that you are "awaiting" an infinite loop call - it has to be created as a task, and ensure the asyncio loop is given control during each loop execution.

    Besides it, mp.queue.get is, by default, blocking - you have to yield to the loop when there are no messages:

    async def mp_queue_to_async_queue():
        processes_finished = 0
        while True:
            message = mp_queue.get()
            ... 
    

    Simply check if there is a message ready, otherwise yield the control back to the async loop:

    from queue import Empty as QueueEmpty
    
    async def run():
       
        # a created task will run whenever another async code yields
        # to the loop. 
        # Although in this case we can do "fire and forget"
        # it is important to keep a reference to each created task:
        queues_task = asyncio.create_task(mp_queue_to_async_queue()) #This row is blocking
        await print_out_async()
    
    
    async def mp_queue_to_async_queue():
        processes_finished = 0
        while True:
            try:
                message = mp_queue.get_nowait()
            except QueueEmpty:
                await asyncio.sleep(0)
                continue
            ...
            print(f"-> {message}")
            ...