I have a manager script that's launching some processes, then using two coroutines (one to monitor, one to gather results). For some reason only one coroutine seems to be run, what am I missing? (I don't work with asyncio)
import multiprocessing as mp
import time
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
class Process(mp.Process):
def __init__(self, task_queue: mp.Queue, result_queue: mp.Queue):
super().__init__()
self.task_queue = task_queue
self.result_queue = result_queue
logging.info('Process init')
def run(self):
while not self.task_queue.empty():
try:
task = self.task_queue.get(timeout=1)
except mp.Queue.Empty:
logging.info('Task queue is empty')
break
time.sleep(1)
logging.info('Processing task %i (pid %i)', task, self.pid)
self.result_queue.put(task)
logging.info('Process run')
class Manager:
def __init__(self):
self.processes = []
self.task_queue = mp.Queue()
self.result_queue = mp.Queue()
self.keep_running = True
async def monitor(self):
while self.keep_running:
await asyncio.sleep(0.1)
logging.info('Task queue size: %i', self.task_queue.qsize())
logging.info('Result queue size: %i', self.result_queue.qsize())
self.keep_running = any([p.is_alive() for p in self.processes])
async def consume_results(self):
while self.keep_running:
try:
result = self.result_queue.get()
except mp.Queue.Empty:
logging.info('Result queue is empty')
continue
logging.info('Got result: %s', result)
def start(self):
# Populate the task queue
for i in range(10):
self.task_queue.put(i)
# Start the processes
for i in range(3):
p = Process(self.task_queue, self.result_queue)
p.start()
self.processes.append(p)
# Wait for the processes to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor())
loop.create_task(self.consume_results())
manager = Manager()
manager.start()
consume_results()
is runHere I'm assuming that you monitor queue sizes, and that you didn't mention env you are in, I'm assuming it's latest 3.12.x.
.get()
call. async def monitor(self):
while self.keep_running:
try:
result = self.result_queue.get()
multiprocessing.Queue.Empty
.During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "...\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "E:...\StackOverflow\78475657\78475657.py", line 20, in run
except mp.Queue.Empty:
^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'Empty'
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
To fix this issue, refer to the "Safe importing of main module"
section in https://docs.python.org/3/library/multiprocessing.html
KeyboardInterrupt
on all child processes to halt the program.Process Process-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "...\StackOverflow\78475657\78475657.py", line 24, in run
time.sleep(1)
KeyboardInterrupt
File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "...\StackOverflow\78475657\78475657.py", line 24, in run
time.sleep(1)
File "...\StackOverflow\78475657\78475657.py", line 24, in run
time.sleep(1)
KeyboardInterrupt
KeyboardInterrupt
Exception ignored in atexit callback: <function _exit_function at 0x000001C044853CE0>
Traceback (most recent call last):
File "...\Python\Python312\Lib\multiprocessing\util.py", line 357, in _exit_function
p.join()
File "...\Python\Python312\Lib\multiprocessing\process.py", line 149, in join
res = self._popen.wait(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\Python\Python312\Lib\multiprocessing\popen_spawn_win32.py", line 110, in wait
res = _winapi.WaitForSingleObject(int(self._handle), msecs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt:
KeyboardInterrupt
.# Wait for the processes to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor())
loop.create_task(self.consume_results())
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<Manager.monitor() running at ...\StackOverflow\78475657\78475657.py:39>>
...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.monitor' was never awaited
self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Manager.consume_results() running at ...\StackOverflow\78475657\78475657.py:46>>
...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.consume_results' was never awaited
self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
asyncio.get_event_loop()
Deprecation warning.Deprecated since version 3.12: Deprecation warning is emitted if there is no current event loop. In some future Python release this will become an error.
...\StackOverflow\78475657\78475657.py:67: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
However, since in notebook
is already in Async Context and has loop, it's opposite in there and what you did is correct.
while not self.task_queue.empty(): # < no use due to race condition
try:
task = self.task_queue.get(timeout=1) # actual important check
except mp.Queue.Empty:
logging.info('Task queue is empty')
break
Use asyncio.to_thread
to push blocking call to thread.
Use queue.Empty
import queue
try:
...
except queue.Empty:
...
Process.join
or better yet use mp.Pool
.# from https://docs.python.org/3/library/multiprocessing.html
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
import asyncio
async def some_task():
print("Start")
await asyncio.sleep(10)
print("End")
def unrecommended_old_way():
loop = asyncio.new_event_loop()
task_1 = loop.create_task(some_task())
task_2 = loop.create_task(some_task())
print("Task spawn done")
loop.run_until_complete(asyncio.gather(task_1, task_2))
print("Tasks all done")
unrecommended_old_way()
Task spawn done
Start
Start
End
End
Tasks all done
import asyncio
async def some_task():
print("Start")
await asyncio.sleep(10)
print("End")
async def recommended_way():
task_1 = asyncio.create_task(some_task())
task_2 = asyncio.create_task(some_task())
print("Task spawn done")
await asyncio.gather(task_1, task_2)
print("Tasks all done")
asyncio.run(recommended_way())
mp.JoinableQueue
Originally I'd not recommend mixing concurrency frameworks, but after knowing notebook is using asyncio by default - I think it kinda makes sense why you was trying it. Sorry for misunderstanding!
Here's what you probably wanted, mixing asyncio
and multiprocessing
.
other_file.py:
import multiprocessing as mp
import logging
import queue
import time
class WorkerProcess(mp.Process):
def __init__(self, task_queue: mp.JoinableQueue, result_queue: mp.Queue, timeout_sec=2):
super().__init__()
self.task_queue = task_queue
self.result_queue = result_queue
self.timeout = timeout_sec
def run(self):
logging.info(f"[{self.pid:10}] Started")
while True:
try:
task_id, task = self.task_queue.get(timeout=self.timeout)
except queue.Empty:
break
try:
logging.info(f"[{self.pid:10}] Processing {task_id}")
time.sleep(1)
logging.info(f"[{self.pid:10}] Processing {task_id} done")
self.result_queue.put(task)
finally:
# signal JoinableQueue that task we took is done
self.task_queue.task_done()
logging.info(f"[{self.pid:10}] Done")
in other file or notebook:
import multiprocessing as mp
import asyncio
import logging
from other_file import WorkerProcess
logging.root.setLevel(logging.NOTSET)
class Manager:
def __init__(self):
self.task_queue = mp.JoinableQueue()
self.result_queue = mp.Queue()
async def _monitor(self):
"""Periodically reports queue sizes.
Automatically cancelled after all processes are killed."""
while True:
await asyncio.sleep(0.5)
logging.info(f"[Manager] Task queue size: {self.task_queue.qsize()}")
logging.info(f"[Manager] Result queue size: {self.result_queue.qsize()}")
async def _process(self):
"""Do whatever you want here for results"""
while (val := await asyncio.to_thread(self.result_queue.get)) is not None:
logging.info(f"[Manager] Got result {val}")
async def start(self):
"""Wait for all tasks to be done"""
# Populate the task queue
for task_idx, task in enumerate(range(10)):
self.task_queue.put((task_idx, task))
# create loop, monitoring task & result process task
monitor_task = asyncio.create_task(self._monitor())
result_proc_task = asyncio.create_task(self._process())
# Start the processes
processes = [
WorkerProcess(self.task_queue, self.result_queue)
for _ in range(3)
]
for process in processes:
process.start()
logging.info("[Manager] Started")
# wait for all tasks to be done. If so, cancel monitor task and wait for it to end
await asyncio.to_thread(self.task_queue.join)
monitor_task.cancel()
# since .task_done() emit is AFTER putting to result queue,
# there's no race condition here. Send sentinel to result queue
# to signal end of process.
self.result_queue.put(None)
logging.info("[Manager] Done")
manager = Manager()
# if running normally:
# asyncio.run(manager.start())
# in notebook (already in async context)
await manager.start()
Normal run log:
INFO:root:[ Manager] Started
INFO:root:[ 22356] Started
INFO:root:[ 22356] Processing 0
INFO:root:[ 14468] Started
INFO:root:[ 14468] Processing 1
INFO:root:[ 21076] Started
INFO:root:[ 21076] Processing 2
INFO:root:[ Manager] Task queue size: 7
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 0 done
INFO:root:[ 14468] Processing 1 done
INFO:root:[ 22356] Processing 3
INFO:root:[ Manager] Got result 0
INFO:root:[ 14468] Processing 4
INFO:root:[ Manager] Got result 1
INFO:root:[ 21076] Processing 2 done
INFO:root:[ 21076] Processing 5
INFO:root:[ Manager] Got result 2
INFO:root:[ Manager] Task queue size: 4
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 3 done
INFO:root:[ 22356] Processing 6
INFO:root:[ Manager] Got result 3
INFO:root:[ 14468] Processing 4 done
INFO:root:[ 14468] Processing 7
INFO:root:[ Manager] Got result 4
INFO:root:[ 21076] Processing 5 done
INFO:root:[ 21076] Processing 8
INFO:root:[ Manager] Got result 5
INFO:root:[ Manager] Task queue size: 1
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 6 done
INFO:root:[ 22356] Processing 9
INFO:root:[ Manager] Got result 6
INFO:root:[ 14468] Processing 7 done
INFO:root:[ Manager] Got result 7
INFO:root:[ 21076] Processing 8 done
INFO:root:[ Manager] Got result 8
INFO:root:[ Manager] Task queue size: 0
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 9 done
INFO:root:[ Manager] Got result 9
INFO:root:[ 21076] Done
INFO:root:[ 14468] Done
INFO:root:[ 22356] Done
INFO:root:[ Manager] Done
Notebook will have no such process logs as each process's outputting it's own stdout stderr files, which isn't a notebook but is console.
Hence output is instead on console (if notebook's running on localhost, not collab)
Terminal:
INFO:root:[ 22228] Started
INFO:root:[ 22228] Processing 0
INFO:root:[ 21712] Started
INFO:root:[ 21712] Processing 1
INFO:root:[ 20724] Started
INFO:root:[ 20724] Processing 2
INFO:root:[ 21712] Processing 1 done
INFO:root:[ 22228] Processing 0 done
INFO:root:[ 21712] Processing 3
INFO:root:[ 22228] Processing 4
INFO:root:[ 20724] Processing 2 done
INFO:root:[ 20724] Processing 5
INFO:root:[ 21712] Processing 3 done
INFO:root:[ 22228] Processing 4 done
INFO:root:[ 21712] Processing 6
INFO:root:[ 22228] Processing 7
INFO:root:[ 20724] Processing 5 done
INFO:root:[ 20724] Processing 8
INFO:root:[ 21712] Processing 6 done
INFO:root:[ 22228] Processing 7 done
INFO:root:[ 21712] Processing 9
INFO:root:[ 20724] Processing 8 done
INFO:root:[ 21712] Processing 9 done
INFO:root:[ 22228] Done
INFO:root:[ 20724] Done
INFO:root:[ 21712] Done
[I 2024-05-15 16:43:24.204 ServerApp] Saving file at /Untitled1.ipynb