Search code examples
pythonmultiprocessingpython-multiprocessing

Bounded semaphore queue and processPoolExecutor


I recently chanced upon this neat trick by @noxdafox using a boundedSemaphore to hold a maximum queue length to limit the number of processes in the queue that is loaded into the processPoolExecutor. This is the link to the gist is the example that I followed.

This issue seems to occur when using processPoolExecutor but not threadPoolExecutor, I'm not sure what the reason and perhaps someone knows if there was a change to concurrent.futures implementation that causes my recent attempt to fail.

Here is the sample code that I used to test out the implementation.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
from threading import BoundedSemaphore

class test:
    def __init__(self):
        self.processExecutor = MaxQueuePool(ProcessPoolExecutor,4,4)
        
    def testfn(self):
        msg = 'haha'
        print(msg)
        return msg
    
    def testing(self):
        return self.processExecutor.submit(self.testfn)

class MaxQueuePool:
    """This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    """
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        """Submits a new task to the pool, blocks if Pool queue is full."""
        self.pool_queue.acquire()
        print('submitting to pool')
        
        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        """Called once task is done, releases one queue slot."""
        self.pool_queue.release()

if __name__ == '__main__':
    thingy = test()
    testthingy = thingy.testing()
    wait([testthingy])
    print(testthingy.result())

I get the following error:

submitting to pool
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:/Users/markus.ng.yu/Downloads/testconcurrency.py", line 44, in <module>
    print(testthingy.result())
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\markus.ng.yu\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object

Will appreciate any insight into why this does not work :>


Solution

  • Your class creates an instance of class MaxQueuePool in its test.__init__ method and assigns it to property self.processExecutor. This MaxQueuePool instance contains a reference to a ProcessPoolExecutor instance. In method testing you have self.processExecutor.submit(self.testfn) where self refers to an instance of class test. But since self.testfn is to be executed in a multiprocessing pool process, this test instance must be sent to the address space of the pool process that will execute the self.testfn "worker function." This serialization/de-serialization is done using pickle. But since the test instance indirectly contains a reference to a ProcessPoolExecutor instance and such an instance cannot be pickled, you get the error you are getting.

    So the problem is caused by the "worker function" (self.testfn) being a method of a class instance that contains properties that cannot be pickled. This can be fixed by changing the worker function, test.testfn, to be a global function, testfn, instead:

    Update

    The following code demonstrates how I would run multiple executions of testfn in parallel using a single multiprocessing pool.

    The list comprehension returned by testing will not be fully computed until all 6 tasks have been submitted. Because the max_queue_size value is 4, the first 2 tasks (for i = 1 and i = 2) submitted will have to first finish before the last 2 tasks (for i = 5 and i = 6) can been submitted. The first 4 tasks will run in parallel and will complete more or less at the same time and so the final 2 tasks will be submitted and complete more or less at the same time, which should be approximately 1 second later than the first 4 tasks.

    If you have a pool size of N, it makes no sense for the max_queue_size value to be less than N. Otherwise, you will always have pool processes that are idle. However, max_queue_size is not accurately named: This value does not represent the number of tasks that can be sitting in the task queue waiting to be executed. It represents the sum of the pool size (i.e. the number of tasks currently being executed) plus the number of tasks waiting to be executed (the actual queue size of waiting tasks).

    If this example we have a pool size of 4 and a max_queue_size value of 4. We are submitting 6 tasks in a loop. The first 4 will immediately be pulled off the task queue an be executed. The next 2 tasks will not be submitted until at least 2 of the previously submitted tasks complete. Until that happens, the current queue size of tasks waiting to be executed is 0 and therefore pool processes will be idle for a short interval between the time a task completes and the next task can be submitted and pulled off the queue to be processed by the idle process. Therefore, unless memory is a problem, I would advise setting max_queue_size to at least two times the pool size so that whenever a process completes executing a task there is already another task on the queue it can pull of to process. Another way of looking ar this is if you are willing to have M tasks submitted and waiting to execute while N (the pool size) tasks are currently being executed, then set max_queue_size to N + M.

    Please read the comments in the code.

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
    from threading import BoundedSemaphore
    import time
    
    # This is now a global function and no longer a method of class test:
    def testfn(n):
    
        time.sleep(1)
        msg = f'haha{n}'
        return msg, time.time() # add completion time
    
    # Class names are typically capitalized:
    class Test:
        def __init__(self):
            self.processExecutor = MaxQueuePool(ProcessPoolExecutor, 4, 4)
    
        def testing(self):
            # Submit 6 tasks.
            return [self.processExecutor.submit(testfn, i) for i in range(1, 7)]
    
    class MaxQueuePool:
        """This Class wraps a concurrent.futures.Executor
        limiting the size of its task queue.
        If `max_queue_size` tasks are submitted, the next call to submit will block
        until a previously submitted one is completed.
        """
        def __init__(self, executor, max_queue_size, max_workers=None):
            self.pool = executor(max_workers=max_workers)
            self.pool_queue = BoundedSemaphore(max_queue_size)
    
        def submit(self, function, *args, **kwargs):
            """Submits a new task to the pool, blocks if Pool queue is full."""
            self.pool_queue.acquire()
            print('submitting to pool at time', time.time())
    
            future = self.pool.submit(function, *args, **kwargs)
            future.add_done_callback(self.pool_queue_callback)
    
            return future
    
        def pool_queue_callback(self, _):
            """Called once task is done, releases one queue slot."""
            self.pool_queue.release()
    
    if __name__ == '__main__':
        tester = Test()
        futures = tester.testing()
        for future in futures:
            print(future.result())
    

    Prints:

    submitting to pool at time 1685373628.8266022
    submitting to pool at time 1685373628.8516064
    submitting to pool at time 1685373628.8526037
    submitting to pool at time 1685373628.853605
    submitting to pool at time 1685373629.978602
    submitting to pool at time 1685373629.9806027
    ('haha1', 1685373629.978602)
    ('haha2', 1685373629.978602)
    ('haha3', 1685373629.978602)
    ('haha4', 1685373629.978602)
    ('haha5', 1685373630.99162)
    ('haha6', 1685373630.99162)
    

    When you were using a ThreadPoolExecutor, there was no need to serialize/de-serialize the test instance since submitted tasks run in the address space of the main thread.