I have a multiprocessing.pool.Pool
that is managed by a discord bot. Since discord.py is async, i use pool.starmap_async()
, pool.apply_async()
and AsyncResult.get()
to manage tasks. the bot starts ONE pool on startup (it may seem weird but it is the most efficient approach for what i'm doing due to long init times)
is there a way i can check on how many processes are currently queued/executing in the pool at any given moment? the checking function would have access to the Pool
itself but not to any AsyncResult
s.
i'm also open to other approaches that yeild the same result, the # of active/queued processes in the pool.
Since discord.py is async, i use
pool.starmap_async()
,pool.apply_async()
andAsyncResult.get()
to manage tasks.
Unless you take special precautions, this does not look correct because AsyncResult.get()
will block the event loop. The async in the name of methods like apply_async
is not the kind of async deployed by asyncio. Multiprocessing uses sync code to communicate with the subprocesses, doing so in background threads to allow your code to proceed with other things.
A safer way to combine asyncio and multiprocessing is the concurrent.futures
module which provides the ProcessPoolExecutor
class that uses multiprocessing internally, and whose executors asyncio supports via run_in_executor
.
is there a way i can check on how many processes are currently queued/executing in the pool at any given moment?
I don't think there is public API to query that information, but if you own the pool, you can easily maintain the necessary stats. For example (untested):
class Pool:
def __init__(self, nworkers):
self._executor = concurrent.futures.ProcessPoolExecutor(nworkers)
self._nworkers = nworkers
self._submitted = 0
async def submit(self, fn, *args):
self._submitted += 1
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(self._executor, fn, *args)
try:
return await fut
finally:
self._submitted -= 1
def stats(self):
queued = max(0, self._submitted - self._nworkers)
executing = min(self._submitted, self._nworkers)
return queued, executing
You would use it by calling submit()
, which you can either await to get the result immediately, or pass to create_task()
to get a future which you can await later or gather()
along with other futures, etc.