If we have 2 asyncio
coroutines, is it possible to use Python multiproessing
to let each of them run in its own process, and allow the coroutines in both processes to be stopped (by calling their stop
method) when the user hits Ctrl+C?
This will be similar to the code below, except that foo.start()
and bar.start()
coroutines should have their own process.
from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
if __name__ == '__main__':
foo = App('foo')
bar = App('bar')
# Running in a single process works fine
try:
asyncio.run(asyncio.wait([foo.start(), bar.start()]))
except KeyboardInterrupt:
asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))
Tried using multiprocessing
and signals
, but I am also not sure how to call foo.stop()
and bar.stop()
before the 2 processes terminates.
if __name__ == '__main__':
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def start_foo():
asyncio.run(foo.start())
def start_bar():
asyncio.run(bar.start())
foo = App('foo')
bar = App('bar')
pool = multiprocessing.Pool(10, init_worker)
try:
print('Starting 2 jobs')
pool.apply_async(start_foo)
pool.apply_async(start_bar)
while True:
time.sleep(1) # is sleeping like this a bad thing?
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
pool.terminate()
pool.join()
print('Shut down complete')
# Based on https://stackoverflow.com/a/11312948/741099
Using Python 3.9.5 on Ubuntu 20.04
Based on @Will Da Silva's solution, I made tiny modifications to check if asyncio.run(app.stop())
gets called on pressing Ctrl+C
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
print(f'Stopping {self.text}')
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
print(f'Starting {len(jobs)} jobs')
pool.map(f, jobs)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
print('Shut down complete')
However, it seems that if I repeat starting and stopping the Python script multiple times, print(f'Stopping {self.text}')
inside app.stop()
does not print to stdout half the time.
Output:
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete
$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete
Here's a way to do it without messing with signals, and without changing the App
class:
import asyncio
import multiprocessing
import os
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(text):
app = App(text)
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = ('foo', 'bar')
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
pool.map(f, jobs)
except KeyboardInterrupt:
pool.close()
pool.join()
It's important that we limit the number of processes in the pool to min(len(jobs), os.cpu_count())
, as any unassigned workers won't be in the try-except block that catches KeyboardInterrupt
when you enter ctrl-c, and so they'll raise an exception.
To avoid this issue completely you could provide the pool with an initializer that ignores SIGINT
, but that prevents us from catching it with KeyboardInterrupt
too. I'm not sure how one would ignore it only in the uninitialized worker processes.
You can also create the App
instances in the parent process, so long as it can be pickled to be passed across the process border into the child processes.
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
pool.map(f, jobs)
except KeyboardInterrupt:
pool.close()
pool.join()