Search code examples
pythonasynchronousmultiprocessingsubprocess

Outsourcing a Subtask to 2nd CPU and getting results when available


I would like to achieve the following. Obviously, i already researched a lot but I wasn't really able to get my head around it fully. Hence, I would appreciate any help:

Let's say I am in a while True do stuff loop, in which I execute a bunch of tasks all over. The exit point of the loop is not important for my problem. Now I have one task within that loop that is a bit more time consuming and hence delays everything. However I do not need the result of that task necessarily to further execute my while loop. I would like to take that specific sub task and push it to a 2nd CPU. In the main loop (on CPU 1) I would ask for the result of this subtask from CPU 2. If it is not there yet I continue without it and if it is there I grab it, use it and would push again a new subtask to CPU 2. The creation of the result of this subtask should however be performed on CPU 2.

while True:
    do stuff
    check for results on CPU 2
    if results there take them and push new subtask to CPU 2
    if results not there simply continue with other stuff

I read about multiprocessing, but I have the feeling that they are using always the same (data heavy) tasks and push them onto multiple CPUs. I also do that for e.g. Reinforcement Learning stuff. But I haven't seen this outsourcing of a subtasks and occasionally asking for the result.

Any ideas? Thanks in advance!


Solution

  • This is one way of doing what you want using a multiprocessing pool to which you can submit long-running tasks to execute in another CPU while the main process continues execution. I have also added code which shows how you could gracefully terminate if and when a condition arises that causes you to break out of your forever loop. Please read the comments carefully:

    from multiprocessing import Pool, cpu_count
    import time
    
    def long_running_task(some_argument):
        ... # process
        time.sleep(1) # simulate a long running task
        result = 'some result'
        print(result)
        return result
    
    def process_loop():
        # We create a pool whose size is the number of CPU
        # cores we have less 1 for the main process to use.
        # If you want to only use one additional CPU, then specify Pool(1):
        pool = Pool(cpu_count() - 1)
        # Hold multiprocessing.pool.AsyncResult instances:
        unfinished_tasks = set()
    
        # Assume some condition exists that causes us to break out of our loop.
        # For this demo, we will just loop 4 times:
        for _ in range(4):
            ... #do stuff
            time.sleep(.5) # simulate short-running task
            # If we need to perform a long running task:
            unfinished_tasks.add(pool.apply_async(long_running_task, args=('some argument',)))
    
            # This will hold newly completed tasks:
            completed_tasks = set()
            # This wil hold newly started tasks:
            new_tasks = set()
            # Test if any submitted tasks have completed
            for task in unfinished_tasks:
                if task.ready(): # Task has completed
                    # get return value from task:
                    return_value = task.get()
                    # Show completed:
                    completed_tasks.add(task)
                    # Use result to create another task.
                    new_tasks.add(pool.apply_async(long_running_task, args=(return_value,)))
            # Compute new unfinished_tasks:
            unfinished_tasks = (unfinished_tasks | new_tasks) - completed_tasks
    
        # Wait for unfnished tasks to complete:
        pool.close()
        pool.join()
    
    if __name__ == '__main__':
        process_loop()
    

    A second way, which you may find simpler, is to use a callback function that gets called whenever a previously submitted task completes.

    from multiprocessing import Pool, cpu_count
    import time
    
    def long_running_task(some_argument):
        ... # process
        time.sleep(1) # simulate a long running task
        return 'some result'
    
    def process_loop():
        stop_flag = False
    
        # Gets invoked on when a submitted task completes:
        def my_callback(return_value):
            print(return_value)
            if not stop_flag:
                # Submit new task using return value from current completed task.
                pool.apply_async(long_running_task, args=(return_value,), callback=my_callback)
    
        # We create a pool whose size is the number of CPU
        # cores we have less 1 for the main process to use.
        # If you want to only use one additional CPU, then specify Pool(1):
        pool = Pool(cpu_count() - 1)
        
        # Instead of a loop "forever", we have some condition that causes
        # us to break out of the loop. For this demo we just loop 4 times:
        for _ in range(4):
            ... #do stuff
            time.sleep(.5) # simulate a short-running task
            # If we need to perform a long running task:
            pool.apply_async(long_running_task, args=('some argument',), callback=my_callback)
        # Prevent new tasks being submitted by our callback:
        stop_flag = True
        # Wait for outstanding tasks to complete:
        pool.close()
        pool.join()
    
    if __name__ == '__main__':
        process_loop()