Search code examples
pythonparallel-processingpython-multiprocessing

How to create a two-level hierarchical process pool in Python?


Task Topology

I need to calculate a mathematical function f(x) for hundreds of thousands of x's. Due to its exotic numerical property, I need to use four numerical algorithms. My experiments show that at different points, the speed of the four algorithms are quite different: the fastest algorithm at point A may be the slowest at point B. Thus I need to implement a hierarchical task pool like the image shows.

Or is it easier to work with multiple process pools?


Solution

  • I would aproch this problem as a multiprocessing problem: 3 steps

    1. Collect your functions (I used a list in the example at the end)
    2. Create a worker function (used worker() in the code below) to run each function
    3. Create a function to terminate functions when 2 of those running have already returnd values (used execute_functions() in the code below)

    My solution is attached but I only used 3 functions, you can use as many as you'd like. Add your algorithm code to the body of each function and you are all set!

    import multiprocessing
    import time
    from typing import List, Callable, Tuple
    
    def worker(func: Callable[[int], int], x: int, result_queue: multiprocessing.Queue):
        result: int = func(x)
        result_queue.put((func.__name__, result))
    
    def execute_functions(x: int, functions: List[Callable[[int], int]]) -> dict[str, int]:
        result_queue: multiprocessing.Queue = multiprocessing.Queue()
        processes: List[multiprocessing.Process] = []
    
        for func in functions:
            process: multiprocessing.Process = multiprocessing.Process(target=worker, args=(func, x, result_queue))
            processes.append(process)
            process.start()
    
        finished_processes: List[multiprocessing.Process] = []
    
        while len(finished_processes) < 2:
            for process in processes:
                if process.is_alive():
                    process.join(timeout=0.1)
                else:
                    if process not in finished_processes:
                        finished_processes.append(process)
                        if len(finished_processes) == 2:
                            break
    
        for process in processes:
            if process.is_alive():
                process.terminate()
    
        results: dict[str, int] = {}
        while not result_queue.empty():
            func_name: str, result: int = result_queue.get()
            results[func_name] = result
    
        return results
    
    # Example functions
    def func1(x: int) -> int:
        time.sleep(5)  # added sleep here to simulate complex algo
        print("func1 has completed")  # print statement to show that only 2 functions return
        return x * 2
    
    def func2(x: int) -> int:
        time.sleep(3)
        print("func2 has completed")  # print statement to show that only 2 functions return
        return x + 5
    
    def func3(x: int) -> int:
        time.sleep(10)
        print("func3 has completed")  # print statement to show that only 2 functions return
        return x - 3
    
    x: int = 10
    functions: List[Callable[[int], int]] = [func1, func2, func3]
    
    print(execute_functions(x, functions))