I need to calculate a mathematical function f(x) for hundreds of thousands of x's. Due to its exotic numerical property, I need to use four numerical algorithms. My experiments show that at different points, the speed of the four algorithms are quite different: the fastest algorithm at point A may be the slowest at point B. Thus I need to implement a hierarchical task pool like the image shows.
Or is it easier to work with multiple process pools?
I would aproch this problem as a multiprocessing problem: 3 steps
worker()
in the code below) to run each functionexecute_functions()
in the code below)My solution is attached but I only used 3 functions, you can use as many as you'd like. Add your algorithm code to the body of each function and you are all set!
import multiprocessing
import time
from typing import List, Callable, Tuple
def worker(func: Callable[[int], int], x: int, result_queue: multiprocessing.Queue):
result: int = func(x)
result_queue.put((func.__name__, result))
def execute_functions(x: int, functions: List[Callable[[int], int]]) -> dict[str, int]:
result_queue: multiprocessing.Queue = multiprocessing.Queue()
processes: List[multiprocessing.Process] = []
for func in functions:
process: multiprocessing.Process = multiprocessing.Process(target=worker, args=(func, x, result_queue))
processes.append(process)
process.start()
finished_processes: List[multiprocessing.Process] = []
while len(finished_processes) < 2:
for process in processes:
if process.is_alive():
process.join(timeout=0.1)
else:
if process not in finished_processes:
finished_processes.append(process)
if len(finished_processes) == 2:
break
for process in processes:
if process.is_alive():
process.terminate()
results: dict[str, int] = {}
while not result_queue.empty():
func_name: str, result: int = result_queue.get()
results[func_name] = result
return results
# Example functions
def func1(x: int) -> int:
time.sleep(5) # added sleep here to simulate complex algo
print("func1 has completed") # print statement to show that only 2 functions return
return x * 2
def func2(x: int) -> int:
time.sleep(3)
print("func2 has completed") # print statement to show that only 2 functions return
return x + 5
def func3(x: int) -> int:
time.sleep(10)
print("func3 has completed") # print statement to show that only 2 functions return
return x - 3
x: int = 10
functions: List[Callable[[int], int]] = [func1, func2, func3]
print(execute_functions(x, functions))