Search code examples
pythonmultithreadingconcurrent.futures

How to wait for a Future who hasn't been submitted to a thread pool yet?


I would like to create a "TaskPoolManager" who start "Task" (custom object) in a ThreadPoolExecutor and prioritize them with importance level, time since submit, etc. (Those are properties of Task)

My problem is when the ThreadPoolExecutor is full, the other task submits to the pool will be executed in "FIFO" and not prioritize.

Here the TaskPoolManager class:


class TaskPoolManager:
    def __init__(self, max_workers: int = None):
        self.max_workers = max_workers or (os.cpu_count() or 1) * 5
        self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers, 
                                                 thread_name_prefix="TaskPoolManager")
        self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
        self.running_workers = 0

    # Task are callable
    def submit(self, task: Task) -> Future:
        if self.running_workers == self.max_workers:
            return self._add_task_to_queue(task)
        else:
            return self._start_task(task)

    def _start_task(self, task: Task) -> Future:
        """Submit a task in the pool"""
        self.running_workers = self.running_workers + 1
        future = self._pool_executor.submit(task)
        future.add_done_callback(lambda x: self._completed_thread())
        return future

    def _add_task_to_queue(self, task: Task) -> Future:
        """Add task to the not started task queue"""
        not_started_future = Future()
        self.pending_task[task] = not_started_future
        return not_started_future

    def _completed_thread(self):
        """Call when a thread in the pool as terminated a task"""
        self.running_workers = self.running_workers - 1
        self._start_task_in_queue()  # By priority level

Here an example of how to use it:

manager = TaskPoolManager()

for i in range(0, 10000):
    manager.submit(Task(func=wait_random_time_task))

f = manager.submit(Task(func=wait_random_time_task))

# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()

Is there a way to connect a client instantiated Future to a Future instance created by ThreadPoolExecutor.submit later in the execution?

If not, is there a way to return a Future like object who can be associated with a future later and still wait for .result()?

In other words: How to wait for a Future who hasn't been submitted to a thread pool yet?


Solution

  • Finally, it wasn't that complicated:

    def _start_task_in_queue(self):
        try:
            # Algorithm could be more complex than just the first one
            task, returned_future = next(iter(self.pending_task.items()))
        except StopIteration:
            return
        started_future = self._pool_executor.submit(task)
        started_future.add_done_callback(lambda result: returned_future.set_result(result))
    
    

    I don't know if it's the best way to solve my problem, cause Future.__init__ specify that:

    Initializes the future. Should not be called by clients

    But it works that way.