Search code examples
pythonpython-3.xpython-asynciocoroutinemultiprocess

Running Two asyncio Coroutines, Each in its own Python Process


If we have 2 asyncio coroutines, is it possible to use Python multiproessing to let each of them run in its own process, and allow the coroutines in both processes to be stopped (by calling their stop method) when the user hits Ctrl+C?

This will be similar to the code below, except that foo.start() and bar.start() coroutines should have their own process.

from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal

class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

if __name__ == '__main__':
    foo = App('foo')
    bar = App('bar')
    
    # Running in a single process works fine
    try:
        asyncio.run(asyncio.wait([foo.start(), bar.start()]))
    except KeyboardInterrupt:
        asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))

Tried using multiprocessing and signals, but I am also not sure how to call foo.stop() and bar.stop() before the 2 processes terminates.

if __name__ == '__main__':
    
    def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        
    def start_foo():
        asyncio.run(foo.start())
        
    def start_bar():
        asyncio.run(bar.start())
        
    foo = App('foo')
    bar = App('bar')    
    pool = multiprocessing.Pool(10, init_worker)
        
    try:
        print('Starting 2 jobs')
        pool.apply_async(start_foo)
        pool.apply_async(start_bar)

        while True:        
            time.sleep(1)  # is sleeping like this a bad thing?
                
    except KeyboardInterrupt:
        print('Caught KeyboardInterrupt, terminating workers')
        pool.terminate()
        pool.join()
    
    print('Shut down complete')

# Based on https://stackoverflow.com/a/11312948/741099

Using Python 3.9.5 on Ubuntu 20.04


Based on @Will Da Silva's solution, I made tiny modifications to check if asyncio.run(app.stop()) gets called on pressing Ctrl+C

class App:
    def __init__(self, text):
        self.text = text
    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        print(f'Stopping {self.text}')
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())
        
if __name__ == '__main__':  
    
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:        
        try:
            print(f'Starting {len(jobs)} jobs')
            pool.map(f, jobs)
                
        except KeyboardInterrupt:
            print('Caught KeyboardInterrupt, terminating workers')
                
    print('Shut down complete')

However, it seems that if I repeat starting and stopping the Python script multiple times, print(f'Stopping {self.text}') inside app.stop() does not print to stdout half the time.

Output:

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete

$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete

Solution

  • Here's a way to do it without messing with signals, and without changing the App class:

    import asyncio
    import multiprocessing
    import os
    
    
    class App:
        def __init__(self, text):
            self.text = text
    
        async def start(self):
            self.loop_task = asyncio.create_task(self.hello())
            await asyncio.wait([self.loop_task])
            
        async def stop(self):
            self.loop_task.cancel()
            
        async def hello(self):
            while True:
                print(self.text)
                await asyncio.sleep(2)
    
    
    def f(text):
        app = App(text)
        try:
            asyncio.run(app.start())
        except KeyboardInterrupt:
            asyncio.run(app.stop())
    
    
    if __name__ == '__main__':
        jobs = ('foo', 'bar')
        with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
            try:
                pool.map(f, jobs)
            except KeyboardInterrupt:
                pool.close()
                pool.join()
    

    It's important that we limit the number of processes in the pool to min(len(jobs), os.cpu_count()), as any unassigned workers won't be in the try-except block that catches KeyboardInterrupt when you enter ctrl-c, and so they'll raise an exception.

    To avoid this issue completely you could provide the pool with an initializer that ignores SIGINT, but that prevents us from catching it with KeyboardInterrupt too. I'm not sure how one would ignore it only in the uninitialized worker processes.

    You can also create the App instances in the parent process, so long as it can be pickled to be passed across the process border into the child processes.

    def f(app):
        try:
            asyncio.run(app.start())
        except KeyboardInterrupt:
            asyncio.run(app.stop())
    
    
    if __name__ == '__main__':
        jobs = (App('foo'), App('bar'))
        with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
            try:
                pool.map(f, jobs)
            except KeyboardInterrupt:
                pool.close()
                pool.join()