I would like to create a "TaskPoolManager" who start "Task" (custom object) in a ThreadPoolExecutor
and prioritize them with importance level, time since submit, etc. (Those are properties of Task)
My problem is when the ThreadPoolExecutor
is full, the other task submits to the pool will be executed in "FIFO" and not prioritize.
Here the TaskPoolManager
class:
class TaskPoolManager:
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or (os.cpu_count() or 1) * 5
self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers,
thread_name_prefix="TaskPoolManager")
self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
self.running_workers = 0
# Task are callable
def submit(self, task: Task) -> Future:
if self.running_workers == self.max_workers:
return self._add_task_to_queue(task)
else:
return self._start_task(task)
def _start_task(self, task: Task) -> Future:
"""Submit a task in the pool"""
self.running_workers = self.running_workers + 1
future = self._pool_executor.submit(task)
future.add_done_callback(lambda x: self._completed_thread())
return future
def _add_task_to_queue(self, task: Task) -> Future:
"""Add task to the not started task queue"""
not_started_future = Future()
self.pending_task[task] = not_started_future
return not_started_future
def _completed_thread(self):
"""Call when a thread in the pool as terminated a task"""
self.running_workers = self.running_workers - 1
self._start_task_in_queue() # By priority level
Here an example of how to use it:
manager = TaskPoolManager()
for i in range(0, 10000):
manager.submit(Task(func=wait_random_time_task))
f = manager.submit(Task(func=wait_random_time_task))
# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()
Is there a way to connect a client instantiated Future
to a Future
instance created by ThreadPoolExecutor.submit
later in the execution?
If not, is there a way to return a Future
like object who can be associated with a future later and still wait for .result()
?
In other words: How to wait for a Future who hasn't been submitted to a thread pool yet?
Finally, it wasn't that complicated:
def _start_task_in_queue(self):
try:
# Algorithm could be more complex than just the first one
task, returned_future = next(iter(self.pending_task.items()))
except StopIteration:
return
started_future = self._pool_executor.submit(task)
started_future.add_done_callback(lambda result: returned_future.set_result(result))
I don't know if it's the best way to solve my problem, cause Future.__init__
specify that:
Initializes the future. Should not be called by clients
But it works that way.