Search code examples
pythonmultithreadingasynchronousmultiprocessingpython-asyncio

What kind of problems (if any) would there be combining asyncio with multiprocessing?


As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.

I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.

I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.

I was looking into the Reactor pattern, and developed a small example using the multiprocessing library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.

Is there anything that could bite me by combining asyncio and multiprocessing?


Solution

  • You should be able to safely combine asyncio and multiprocessing without too much trouble, though you shouldn't be using multiprocessing directly. The cardinal sin of asyncio (and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.

    The simplest way to avoid this is to use BaseEventLoop.run_in_executor to execute a function in a concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor is a process pool implemented using multiprocessing.Process, but asyncio has built-in support for executing a function in it without blocking the event loop. Here's a simple example:

    import time
    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    
    def blocking_func(x):
       time.sleep(x) # Pretend this is expensive calculations
       return x * 5
    
    @asyncio.coroutine
    def main():
        #pool = multiprocessing.Pool()
        #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
        executor = ProcessPoolExecutor()
        out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
        print(out)
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing, like Queue, Event, Manager, etc., there is a third-party library called aioprocessing (full disclosure: I wrote it), that provides asyncio-compatible versions of all the multiprocessing data structures. Here's an example demoing that:

    import time
    import asyncio
    import aioprocessing
    import multiprocessing
    
    def func(queue, event, lock, items):
        with lock:
            event.set()
            for item in items:
                time.sleep(3)
                queue.put(item+5)
        queue.close()
    
    @asyncio.coroutine
    def example(queue, event, lock):
        l = [1,2,3,4,5]
        p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
        p.start()
        while True:
            result = yield from queue.coro_get()
            if result is None:
                break
            print("Got result {}".format(result))
        yield from p.coro_join()
    
    @asyncio.coroutine
    def example2(queue, event, lock):
        yield from event.coro_wait()
        with (yield from lock):
            yield from queue.coro_put(78)
            yield from queue.coro_put(None) # Shut down the worker
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        queue = aioprocessing.AioQueue()
        lock = aioprocessing.AioLock()
        event = aioprocessing.AioEvent()
        tasks = [ 
            asyncio.async(example(queue, event, lock)),
            asyncio.async(example2(queue, event, lock)),
        ]   
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()