I want to return already completed task values in multiprocessing for a given timeout after killing all the ongoing and queued tasks.
For example, the following function needed to be run parallelly using pool.starmap()
For values, 1 to 100.
def func(x):
time.sleep(1)
return x * 2
Let's say, after 5 seconds (defined timeout), tasks related to values 1 to 10 are completed and 11-90 are still running or queued. Then at 5 seconds, I want to kill all processes and return the values of the completed tasks for the values 1 to 10. So then the return list for the 100 tasks should be [2,4,6,8,10, ....18, 20, None, None,....None]. The main function I tried is as follows,
def main():
pool = multiprocessing.Pool(processes=3)
val = [[i] for i in range(100)]
result_obj = pool.starmap_async(func, val)
result_obj.wait(5)
if result_obj.ready():
result = result_obj.get(1)
However, here the processes are still running in the background and couldn't find a way to kill those processes and capture the already completed tasks after 5 seconds. Is this possible?
This answer contains all the information you need: To retrieve results while they are being generated, imap_unordered is probably the best function to use as it returns results to the main thread once they are completed. You would just have to perform a bit of bookkeeping to ensure that the results end up in the right position in your result queue. A way to achieve that would be to pass an index to the parallelized function which that function then returns.
Some simplified pseudo-code below that you should be able to derive a solution with:
def worker(id, other_args):
arg1, arg2, ..., argn = other_args # Assuming other_args is a tuple or something else unpackable
do something
return idx, result
obtained_results = [None] * nr_tasks
with multiprocessing.Pool() as pool:
for idx, result in pool.imap_unordered(worker, enumerate(arg_list)):
obtained_results[idx] = result
if time > timeout:
pool.terminate()
break
Only small disadvantage is that the timeout is only triggered after the first result exceeding the timeout is returned. For short tasks, this shouldn't be an issue. If you have longer-running tasks, you might need something else, but pool.terminate()
is probably still the way to go for you.