Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

How can I track idle and non-idle Python multiprocessing processes for optimal task allocation?


I have a question regarding Python multiprocessing and keeping track of idle and non-idle processes.

Currently, I only want to allocate a task to the pool if there is a process in an idle state instead of relying on the built-in multiprocessing queue. Reason being is that later on, I will have multiple ECS Tasks pulling messages off of an SQS queue. Messages should only be pulled off the queue by ECS Tasks that have processes in an idle state. This is to avoid messages being pulled off the queue and waiting for a process to become available.

In the code below, I am using pool._processes and len(pool._cache) to calculate the number of processes that are in an idle state.

._processes and ._cache are both protected attributes. My question is — is there a preferred way of getting the number of processes in an idle state or is what I've got below the norm?

import multiprocessing
import time


# Define a function that represents a worker process
def worker(num):
    print(f"Worker {num} starting")
    # simulate some work
    time.sleep(2)


if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        iteration_count: int = 0
        idle_workers: int = pool._processes - len(pool._cache)

        while True:
            # Check if there are any idle worker processes in the pool
            if idle_workers != 0:
                # Number of running and idle tasks
                print(f"{len(pool._cache)} tasks running, {idle_workers} tasks idle")

                # Submit a new task to the pool using the worker function
                result = pool.apply_async(worker, args=(iteration_count,))
                iteration_count += 1
            else:
                # If there are no idle worker processes, sleep for 1 second before checking again
                time.sleep(1)
                continue

Thanks for the help!


Solution

  • Yes, attributes like pool._processes should be considered private to us (or perhaps protected to the programmers who maintain Python) and consequently risky to use since the pool implementation could one day change. If I understand your issue, your real problem is how to know when you can make a call to apply_async and be sure that the submitted task will be processed right away and not sitting on the task queue waiting for a process to become idle. This can be done using the published pool interface without explicitly trying to count idle processes.

    So instead of trying to figure out what processes are idle or not, we can use a specialization of the multiprocessing.pool.Pool class, BoundedQueueProcessPool, that will block calls to apply_async until there is an idle process. But how do we know when a process is idle? BoundedQueueProcessPool uses apply_async callback functions that are invoked whenever a submitted task completes, from which can be inferred that a process has just become idle.

    The initializer for this class takes an optional argument, max_waiting_tasks, specifying how many tasks can be in the task queue waiting for a process to become idle. In general, it is good for this to be non-zero. The default value is None, which results in using a value of max_waiting_tasks equal to the pool size. In that way when a process becomes idle a task is already sitting on the queue waiting to be processed, thus resulting in no delay between a process becoming idle and processing the next task. But, in your case, it sounds like you would want to specify max_waiting_tasks=0.

    I have modified your demo to have worker sleep a random amount of time and to also print when it has finished processing its task:

    import multiprocessing.pool
    import multiprocessing
    
    import time
    import random
    
    class BoundedQueuePool:
        def __init__(self, limit, semaphore_type):
            self._semaphore = semaphore_type(limit)
    
        def release(self, result, callback=None):
            self._semaphore.release()
            if callback:
                callback(result)
    
        def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
            self._semaphore.acquire()
            callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
            error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
            return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
    
    class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
        def __init__(self, *args, max_waiting_tasks=None, **kwargs):
            multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
            if max_waiting_tasks is None:
                max_waiting_tasks = self._processes
            elif max_waiting_tasks < 0:
                raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
            limit = self._processes + max_waiting_tasks
            BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore)
    
    # Define a function that represents a worker process
    def worker(num):
        print(f"Worker {num} starting", flush=True)
        # simulate some work using a random time
        time.sleep(random.uniform(1.0, 2.0))
        print(f"Worker {num} ending", flush=True)
    
    if __name__ == '__main__':
        with BoundedQueueProcessPool(processes=4, max_waiting_tasks=0) as pool:
            # Modified so the program eventually terminates for demo purposes:
            for i in range(10):
                # This will block until there is an idle process:
                pool.apply_async(worker, args=(i,))
            # Wait for all tasks to complete
            pool.close()
            pool.join()
    

    Prints:

    Worker 0 starting
    Worker 1 starting
    Worker 2 starting
    Worker 3 starting
    Worker 2 ending
    Worker 4 starting
    Worker 1 ending
    Worker 5 starting
    Worker 3 ending
    Worker 6 starting
    Worker 0 ending
    Worker 7 starting
    Worker 4 ending
    Worker 8 starting
    Worker 5 ending
    Worker 9 starting
    Worker 6 ending
    Worker 7 ending
    Worker 9 ending
    Worker 8 ending
    

    Of course, there is no need to use this class. You could just define your own callback functions that will submit a new task whenever they are invoked. By the way, you can still have your own callback functions even if you use this class.