Search code examples
pythonmultithreadingmultiprocessing

How to run many threads/procs with global timeout


I'm trying to run many python threads (or separate processes) from the 'run' method of class GetDayMin. I want the threads or processes to run simultaneously, and after 40 seconds the class instance writes it's data (from each thread/proc) and exits. While I can start each thread without waiting on anything to complete, if I use the join method to wait on any thread, it could take a long time to timeout since successive threads may all be blocked. It seems the join method of both threading and multiprocessing will hang until the timeout.

For example, if in my class I start 5 threads and then wait 40 seconds in order of thread creation, the first thread could take 40 seconds to timeout, and then we go to the second thread with takes 40 seconds to timeout etc. We could end up waiting 200 seconds for 5 threads.

What I want is that no thread takes longer than 40 seconds, so the whole class instance lasts a maximum of 40 seconds too. I'm willing to do multiprocessing instead of multithreading if that makes things easier. What I really anticipate is that most threads will complete within 10 seconds but three or four may hang, and I don't want to wait for them. How can I accomplish this?

import multiprocessing
import pandas as pd
import random
import time

class GetDayMin:
    def __init__(self):
        self.results = pd.DataFrame()  # Shared DataFrame to store results

    def add_result(self, result):
        self.results = self.results.append(result, ignore_index=True)

    def process_function(self):
        sleep_time = random.randint(30, 50)  # Random sleep time between 30 to 50 seconds
        time.sleep(sleep_time)  # Pretend I'm calculating something
        
        # Return the time slept to store in the results to simulate thread communication
        return {'process_id': multiprocessing.current_process().pid, 'time_slept': sleep_time}

    def run(self):
        processes = []
        for _ in range(30):
            process = multiprocessing.Process(target=self.process_function)
            processes.append(process)

        # Start all processes
        for process in processes:
            process.start()

        # Wait for all processes to finish or timeout after 40 seconds (each--unfortunately)
        for process in processes:
            process.join(timeout=40)
            if process.is_alive():
                process.terminate()
                process.join()  # wait on process--want this to be a collective 40 seconds

Solution

  • It seems to me that you can compute an absolute time expiration_time when all submitted tasks should have completed and if any are still running after that time they should be terminated. The amount of time you specify on process.join() can be computed from that expiration_time and the current time:

    ...
        def run(self):
            processes = []
            for _ in range(30):
                process = multiprocessing.Process(target=self.process_function)
                processes.append(process)
    
            # Start all processes
            for process in processes:
                process.start()
    
            # Wait for all processes to finish or timeout after 40 seconds
            expiration_time = time.time() + 40
            time_expired = False
            for process in processes:
                if time_expired:
                    process.terminate()
                else:
                    wait_time = expiration_time - time.time()
                    if wait_time <= 0:
                        process.terminate()
                        time_expired = True
                    else:
                        process.join(timeout=wait_time)
                        if process.is_alive():
                            process.terminate()
                            time_expired = True
    

    But Consider This

    You can see my answer above, which only addresses your timing issue, but I also wanted to mention that if your actual process_function calls method add_result the results will not be what you expected because each process will see its own copy of the dataframe. Also, you have method process_function returning a value. But you cannot return a value back to the caller form a process instance that way. You might want the following:

    import multiprocessing
    import pandas as pd
    import random
    import time
    
    class GetDayMin:
        def __init__(self):
            self.results = pd.DataFrame()  # Shared DataFrame to store results
    
        def add_result(self, result):
            self.results = self.results.append(result, ignore_index=True)
    
        def process_function(self, results_dict):
            sleep_time = random.randint(30, 50)  # Random sleep time between 30 to 50 seconds
            time.sleep(sleep_time)  # Pretend I'm calculating something
    
            # Return the time slept to store in the results to simulate thread communication
            results_dict[multiprocessing.current_process().pid] = sleep_time
    
        def run(self):
            with multiprocessing.Manager() as manager:
                results_dict = manager.dict()
                processes = [
                    multiprocessing.Process(target=self.process_function, args=(results_dict,))
                    for _ in range(30)
                ]
    
                # Start all processes
                for process in processes:
                    process.start()
    
                # Wait for all processes to finish or timeout after 40 seconds
                start_time = time.time()
                expiration_time = start_time + 40
                time_expired = False
                for process in processes:
                    if time_expired:
                        process.terminate()
                    else:
                        wait_time = expiration_time - time.time()
                        if wait_time <= 0:
                            process.terminate()
                            time_expired = True
                        else:
                            process.join(timeout=wait_time)
                            if process.is_alive():
                                process.terminate()
                                time_expired = True
    
                end_time = time.time()
                print(f'Total elapsed time: {end_time - start_time} seconds')
                for pid, sleep_time in results_dict.items():
                    print(f'Process {pid} slept for {sleep_time} seconds')
    
    if __name__ == '__main__':
        GetDayMin().run()
    

    Prints:

    Total elapsed time: 40.00496006011963 seconds
    Process 500 slept for 30 seconds
    Process 504 slept for 30 seconds
    Process 466 slept for 32 seconds
    Process 512 slept for 33 seconds
    Process 461 slept for 34 seconds
    Process 470 slept for 35 seconds
    Process 455 slept for 36 seconds
    Process 459 slept for 36 seconds
    Process 491 slept for 36 seconds
    Process 472 slept for 37 seconds
    Process 507 slept for 37 seconds
    Process 486 slept for 39 seconds
    Process 463 slept for 40 seconds
    Process 477 slept for 40 seconds
    

    The same can be accomplished with a multiprocessing pool. In this case given your worker function process_function if we have a pool of size N and we submit N tasks to the pool, each pool process will process a single task. The advantage of using a pool is that the worker function is able to directly return a result and we therefore don't need to use a managed dictionary.

    import multiprocessing
    import pandas as pd
    import random
    import time
    
    class GetDayMin:
        def process_function(self):
            sleep_time = random.randint(30, 50)  # Random sleep time between 30 to 50 seconds
            time.sleep(sleep_time)  # Pretend I'm calculating something
    
            # Return the time slept to store in the results to simulate thread communication
            return multiprocessing.current_process().pid, sleep_time
    
        def run(self):
            results = {}
    
            n_tasks = 30
            with multiprocessing.Pool(n_tasks) as pool:
                async_results = [
                    pool.apply_async(self.process_function)
                    for _ in range(n_tasks)
                ]
    
                # Wait for all processes to finish or timeout after 40 seconds
                start_time = time.time()
                expiration_time = start_time + 40
                for async_result in async_results:
                    wait_time = expiration_time - time.time()
                    if wait_time <= 0:
                        # Time has expired and we cannot wait for the task to complete.
                        # So has the task already completed:
                        if async_result.ready():
                            # The task has completed and so
                            # this should not block:
                            pid, sleep_time = async_result.get()
                            results[pid] = sleep_time
                    else:
                        # The time has not expired so we are willing to wait
                        # a certain amount of time for the result:
                        try:
                            pid, sleep_time = async_result.get(wait_time)
                        except multiprocessing.TimeoutError:
                            # We will not get a result from this task
                            pass
                        else:
                            results[pid] = sleep_time
            # When we exit the above block all remaining tasks will be cancelled
    
            end_time = time.time()
            print(f'Total elapsed time: {end_time - start_time} seconds')
            for pid, sleep_time in results.items():
                print(f'Process {pid} slept for {sleep_time} seconds')
    
    if __name__ == '__main__':
        GetDayMin().run()
    

    Prints:

    Total elapsed time: 40.01216197013855 seconds
    Process 832 slept for 32 seconds
    Process 833 slept for 35 seconds
    Process 834 slept for 35 seconds
    Process 837 slept for 31 seconds
    Process 844 slept for 38 seconds
    Process 846 slept for 34 seconds
    Process 849 slept for 31 seconds
    Process 851 slept for 36 seconds
    Process 852 slept for 35 seconds
    Process 853 slept for 30 seconds
    Process 854 slept for 39 seconds
    Process 856 slept for 31 seconds
    Process 857 slept for 37 seconds
    Process 859 slept for 38 seconds
    Process 860 slept for 36 seconds