Search code examples
pythonpython-asynciopython-multiprocessing

Python AsyncIO within MultiProcessing Processes


I am trying to create two processes that run forever, each running an asyncio loop inside of them.

I want to understand if the logic is correct. Is there a better way to do the same thing?

import asyncio
import multiprocessing


async def my_async_func(topic):
    while True:
        await asyncio.sleep(5)
        print(topic)


def create_aio_loop(topic):
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(my_async_func(topic), loop=loop)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.stop()


def main():
    topic_a = 'New to Asyncio'
    topic_b = 'New to Multiprocessing'
    process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
    process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
    processes = [process_a, process_b]

    try:
        for process in processes:
            process.start()
    except KeyboardInterrupt:
        for process in processes:
            process.terminate()
            process.join()


if __name__ == '__main__':
    main()

Solution

  • I am trying to create two processes that run forever, each running an asyncio loop inside of them.

    I assume that you know why you wish to dispatch some of your code on several processing (virtual) cores (multiprocessing) and paralellize the rest on the same core (asyncio).

    Then I think that you did right: you spawn two processes, and each of them has its own asyncio loop. The only improvement that I could find was to use loop.run_until_complete, which removes one line of code :) :

    import os
    import asyncio
    import multiprocessing
    
    async def my_async_func(topic):
        while True:
            await asyncio.sleep(5)
            print(topic)
    
    
    def create_aio_loop(topic):
        process_name = "[Process %s, topic %s]" % (os.getpid(), topic)
        print("%s Started " % process_name)
    
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(my_async_func(topic))
        except KeyboardInterrupt:
            print("%s Loop interrupted" % process_name)
            loop.stop()
    
        print("%s terminating" % process_name)
    
    
    if __name__ == '__main__':
        topic_a = 'New to Asyncio'
        topic_b = 'New to Multiprocessing'
        process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
        process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
        processes = [process_a, process_b]
    
        try:
            for process in processes:
                process.start()
        except KeyboardInterrupt:
            for process in processes:
                process.terminate()
                process.join()
    

    Oh and also I suggest that you display all messages using a prefix containing the process id, that is much easier for multiprocessing debugging. I introduced an example with the start/terminate print messages.

    Running this yields:

    >python tmp_asyncio.py
    [Process 11456, topic New to Multiprocessing] Started
    [Process 18396, topic New to Asyncio] Started
    New to Asyncio
    New to Multiprocessing
    
    (here I pressed Ctrl+C)
    
    [Process 11456, topic New to Multiprocessing] Loop interrupted
    [Process 11456, topic New to Multiprocessing] terminating
    [Process 18396, topic New to Asyncio] Loop interrupted
    [Process 18396, topic New to Asyncio] terminating
    Error in atexit._run_exitfuncs:
    Traceback (most recent call last):
      File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\util.py", line 310, in _exit_function
        p.join()
      File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\process.py", line 121, in join
        res = self._popen.wait(timeout)
      File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\popen_spawn_win32.py", line 81, in wait
        res = _winapi.WaitForSingleObject(int(self._handle), msecs)
    KeyboardInterrupt