I recently chanced upon this neat trick by @noxdafox using a boundedSemaphore
to hold a maximum queue length to limit the number of processes in the queue that is loaded into the processPoolExecutor
. This is the link to the gist is the example that I followed.
This issue seems to occur when using processPoolExecutor
but not threadPoolExecutor
, I'm not sure what the reason and perhaps someone knows if there was a change to concurrent.futures
implementation that causes my recent attempt to fail.
Here is the sample code that I used to test out the implementation.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
class test:
def __init__(self):
self.processExecutor = MaxQueuePool(ProcessPoolExecutor,4,4)
def testfn(self):
msg = 'haha'
print(msg)
return msg
def testing(self):
return self.processExecutor.submit(self.testfn)
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
print('submitting to pool')
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
thingy = test()
testthingy = thingy.testing()
wait([testthingy])
print(testthingy.result())
I get the following error:
submitting to pool
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:/Users/markus.ng.yu/Downloads/testconcurrency.py", line 44, in <module>
print(testthingy.result())
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
return self.__get_result()
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
Will appreciate any insight into why this does not work :>
Your class creates an instance of class MaxQueuePool
in its test.__init__
method and assigns it to property self.processExecutor
. This MaxQueuePool
instance contains a reference to a ProcessPoolExecutor
instance. In method testing
you have self.processExecutor.submit(self.testfn)
where self
refers to an instance of class test
. But since self.testfn
is to be executed in a multiprocessing pool process, this test
instance must be sent to the address space of the pool process that will execute the self.testfn
"worker function." This serialization/de-serialization is done using pickle
. But since the test
instance indirectly contains a reference to a ProcessPoolExecutor
instance and such an instance cannot be pickled, you get the error you are getting.
So the problem is caused by the "worker function" (self.testfn
) being a method of a class instance that contains properties that cannot be pickled. This can be fixed by changing the worker function, test.testfn
, to be a global function, testfn
, instead:
Update
The following code demonstrates how I would run multiple executions of testfn
in parallel using a single multiprocessing pool.
The list comprehension returned by testing
will not be fully computed until all 6 tasks have been submitted.
Because the max_queue_size
value is 4, the first 2 tasks (for i = 1 and i = 2)
submitted will have to first finish before the last 2 tasks (for i = 5 and i = 6) can been submitted.
The first 4 tasks will run in parallel and will complete more or less at the same time
and so the final 2 tasks will be submitted and complete more or less at the same time,
which should be approximately 1 second later than the first 4 tasks.
If you have a pool size of N, it makes no sense for the max_queue_size
value
to be less than N. Otherwise, you will always have pool processes that are idle.
However, max_queue_size
is not accurately named:
This value does not represent the number of tasks that can be sitting in the
task queue waiting to be executed. It represents the sum of the pool size (i.e. the
number of tasks currently being executed) plus the number of tasks waiting to be executed
(the actual queue size of waiting tasks).
If this example we have a pool size of 4 and a max_queue_size
value of 4.
We are submitting 6 tasks in a loop. The first 4 will immediately be pulled off the task queue
an be executed. The next 2 tasks will not be submitted until at least 2 of the previously submitted
tasks complete. Until that happens, the current queue size of tasks waiting to be executed is 0 and therefore
pool processes will be idle for a short interval between the time a task completes and the next task can
be submitted and pulled off the queue to be processed by the idle process.
Therefore, unless memory is a problem, I would advise setting max_queue_size
to at least
two times the pool size so that whenever a process completes executing a task there is already
another task on the queue it can pull of to process.
Another way of looking ar this is if you are willing to have M tasks submitted and
waiting to execute while N (the pool size) tasks are currently being executed, then set max_queue_size
to N + M.
Please read the comments in the code.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore
import time
# This is now a global function and no longer a method of class test:
def testfn(n):
time.sleep(1)
msg = f'haha{n}'
return msg, time.time() # add completion time
# Class names are typically capitalized:
class Test:
def __init__(self):
self.processExecutor = MaxQueuePool(ProcessPoolExecutor, 4, 4)
def testing(self):
# Submit 6 tasks.
return [self.processExecutor.submit(testfn, i) for i in range(1, 7)]
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
print('submitting to pool at time', time.time())
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
tester = Test()
futures = tester.testing()
for future in futures:
print(future.result())
Prints:
submitting to pool at time 1685373628.8266022
submitting to pool at time 1685373628.8516064
submitting to pool at time 1685373628.8526037
submitting to pool at time 1685373628.853605
submitting to pool at time 1685373629.978602
submitting to pool at time 1685373629.9806027
('haha1', 1685373629.978602)
('haha2', 1685373629.978602)
('haha3', 1685373629.978602)
('haha4', 1685373629.978602)
('haha5', 1685373630.99162)
('haha6', 1685373630.99162)
When you were using a ThreadPoolExecutor
, there was no need to serialize/de-serialize the test
instance since submitted tasks run in the address space of the main thread.