Search code examples
pythonconcurrencytimeoutpython-3.x

How to use concurrent.futures with timeouts?


I am trying to get timeouts to work in python3.2 using the concurrent.futures module. However when it does timeout, it doesn't really stop the execution. I tried with both threads and process pool executors neither of them stop the task, and only until its finished does a timeout become raised. So does anyone know if its possible to get this working?

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0;
    for i in range(1, max_number + 1):
        last_number = i * i
    return last_number

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")

if __name__ == '__main__':
    main()

Solution

  • As far as I can tell, TimeoutError is actually raised when you would expect it, and not after the task is finished.

    However, your program itself will keep on running until all running tasks have been completed. This is because currently executing tasks (in your case, probably all your submitted tasks, as your pool size equals the number of tasks), are not actually "killed".

    The TimeoutError is raised, so that you can choose not to wait until the task is finished (and do something else instead), but the task will keep on running until completed. And python will not exit as long as there are unfinished tasks in the threads/subprocesses of your Executor.

    As far as I know, it is not possible to just "stop" currently executing Futures, you can only "cancel" scheduled tasks that have yet to be started. In your case, there won't be any, but imagine that you have pool of 5 threads/processes, and you want to process 100 items. At some point, there might be 20 completed tasks, 5 running tasks, and 75 tasks scheduled. In this case, you would be able to cancel those 76 scheduled tasks, but the 4 that are running will continue until completed, whether you wait for the result or not.

    Even though it cannot be done that way, I guess there should be ways to achieve your desired end result. Maybe this version can help you on the way (not sure if it does exactly what you wanted, but it might be of some use):

    import concurrent.futures
    import time
    import datetime
    
    max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
    
    class Task:
        def __init__(self, max_number):
            self.max_number = max_number
            self.interrupt_requested = False
    
        def __call__(self):
            print("Started:", datetime.datetime.now(), self.max_number)
            last_number = 0;
            for i in xrange(1, self.max_number + 1):
                if self.interrupt_requested:
                    print("Interrupted at", i)
                    break
                last_number = i * i
            print("Reached the end")
            return last_number
    
        def interrupt(self):
            self.interrupt_requested = True
    
    def main():
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
            tasks = [Task(num) for num in max_numbers]
            for task, future in [(i, executor.submit(i)) for i in tasks]:
                try:
                    print(future.result(timeout=1))
                except concurrent.futures.TimeoutError:
                    print("this took too long...")
                    task.interrupt()
    
    
    if __name__ == '__main__':
        main()
    

    By creating a callable object for each "task", and giving those to the executor instead of just a plain function, you can provide a way to "interrupt" the task. Tip: remove the task.interrupt() line and see what happens, it may make it easier to understand my long explanation above ;-)