I would like to achieve the following. Obviously, i already researched a lot but I wasn't really able to get my head around it fully. Hence, I would appreciate any help:
Let's say I am in a while True do stuff
loop, in which I execute a bunch of tasks all over. The exit point of the loop is not important for my problem. Now I have one task within that loop that is a bit more time consuming and hence delays everything. However I do not need the result of that task necessarily to further execute my while loop. I would like to take that specific sub task and push it to a 2nd CPU. In the main loop (on CPU 1) I would ask for the result of this subtask from CPU 2. If it is not there yet I continue without it and if it is there I grab it, use it and would push again a new subtask to CPU 2. The creation of the result of this subtask should however be performed on CPU 2.
while True:
do stuff
check for results on CPU 2
if results there take them and push new subtask to CPU 2
if results not there simply continue with other stuff
I read about multiprocessing, but I have the feeling that they are using always the same (data heavy) tasks and push them onto multiple CPUs. I also do that for e.g. Reinforcement Learning stuff. But I haven't seen this outsourcing of a subtasks and occasionally asking for the result.
Any ideas? Thanks in advance!
This is one way of doing what you want using a multiprocessing pool to which you can submit long-running tasks to execute in another CPU while the main process continues execution. I have also added code which shows how you could gracefully terminate if and when a condition arises that causes you to break out of your forever loop. Please read the comments carefully:
from multiprocessing import Pool, cpu_count
import time
def long_running_task(some_argument):
... # process
time.sleep(1) # simulate a long running task
result = 'some result'
print(result)
return result
def process_loop():
# We create a pool whose size is the number of CPU
# cores we have less 1 for the main process to use.
# If you want to only use one additional CPU, then specify Pool(1):
pool = Pool(cpu_count() - 1)
# Hold multiprocessing.pool.AsyncResult instances:
unfinished_tasks = set()
# Assume some condition exists that causes us to break out of our loop.
# For this demo, we will just loop 4 times:
for _ in range(4):
... #do stuff
time.sleep(.5) # simulate short-running task
# If we need to perform a long running task:
unfinished_tasks.add(pool.apply_async(long_running_task, args=('some argument',)))
# This will hold newly completed tasks:
completed_tasks = set()
# This wil hold newly started tasks:
new_tasks = set()
# Test if any submitted tasks have completed
for task in unfinished_tasks:
if task.ready(): # Task has completed
# get return value from task:
return_value = task.get()
# Show completed:
completed_tasks.add(task)
# Use result to create another task.
new_tasks.add(pool.apply_async(long_running_task, args=(return_value,)))
# Compute new unfinished_tasks:
unfinished_tasks = (unfinished_tasks | new_tasks) - completed_tasks
# Wait for unfnished tasks to complete:
pool.close()
pool.join()
if __name__ == '__main__':
process_loop()
A second way, which you may find simpler, is to use a callback function that gets called whenever a previously submitted task completes.
from multiprocessing import Pool, cpu_count
import time
def long_running_task(some_argument):
... # process
time.sleep(1) # simulate a long running task
return 'some result'
def process_loop():
stop_flag = False
# Gets invoked on when a submitted task completes:
def my_callback(return_value):
print(return_value)
if not stop_flag:
# Submit new task using return value from current completed task.
pool.apply_async(long_running_task, args=(return_value,), callback=my_callback)
# We create a pool whose size is the number of CPU
# cores we have less 1 for the main process to use.
# If you want to only use one additional CPU, then specify Pool(1):
pool = Pool(cpu_count() - 1)
# Instead of a loop "forever", we have some condition that causes
# us to break out of the loop. For this demo we just loop 4 times:
for _ in range(4):
... #do stuff
time.sleep(.5) # simulate a short-running task
# If we need to perform a long running task:
pool.apply_async(long_running_task, args=('some argument',), callback=my_callback)
# Prevent new tasks being submitted by our callback:
stop_flag = True
# Wait for outstanding tasks to complete:
pool.close()
pool.join()
if __name__ == '__main__':
process_loop()