I want to use multiprocessing.Pool, but multiprocessing.Pool can't abort a task after a timeout. I found solution and some modify it.
from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time
def worker(y):
print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
start = time.time()
while True:
if time.time() - start >= y:
break
time.sleep(0.5)
# show work progress
print(y)
return y
def collect_my_result(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# Wait timeout seconds for func to complete.
out = res.get(timeout)
except TimeoutError:
print("Aborting due to timeout {}".format(args[1]))
# kill worker itself when get TimeoutError
sys.exit(1)
else:
return out
def empty_func():
pass
if __name__ == "__main__":
TIMEOUT = 4
util.log_to_stderr(util.DEBUG)
pool = Pool(processes=4)
# k - time to job sleep
featureClass = [(k,) for k in range(20, 0, -1)] # list of arguments
for f in featureClass:
# check available worker
pool.apply(empty_func)
# run job with timeout
abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
pool.apply_async(abortable_func, args=f, callback=collect_my_result)
time.sleep(TIMEOUT)
pool.terminate()
print("exit")
main modification - worker process exit with sys.exit(1). It's kill worker process and kill job thread, but i'm not sure that this solution is good. What potential problems can i get, when process terminate itself with running job?
There is no implicit risk in stopping a running job, the OS will take care of correctly terminating the process.
If your job is writing on files, you might end up with lots of truncated files on your disk.
Some small issue might also occur if you write on DBs or if you are connected with some remote process.
Nevertheless, Python standard Pool does not support worker termination on task timeout. Terminating processes abruptly might lead to weird behaviour within your application.
Pebble processing Pool does support timing-out tasks.
from pebble import ProcessPool
from concurrent.futures import TimeoutError
TIMEOUT_SECONDS = 5
def function(one, two):
return one + two
with ProcessPool() as pool:
future = pool.schedule(function, args=(1, 2), timeout=TIMEOUT_SECONDS)
try:
result = future.result()
except TimeoutError:
print("Future: %s took more than 5 seconds to complete" % future)