I'm setting up a function to asynchronously start a new process to run a very cpu heavy function. Most of the documentation don't cover this thoroughly, and what I've pieced together doesn't seem to asynchronously work.
I have a function procManager
which takes in a function, the args to pass into the function, and an object name for basic logging.
async def procManager(f,a,o):
print(f"{o} started at {time.strftime('%X')}")
p = Process(target=f, args=(a,))
p_parent = os.getppid() # parent process
p_curr = os.getpid() # current process
print("parent process:", p_parent)
print("current process:", p_curr)
p.start()
p.join()
print(f"{o} finished at {time.strftime('%X')}")
print("=========")
I have this cpu heavy function that runs Louvain's community detection on a networkX graph that I pass into def procManager
to spawn on a new process.
def community(cg):
start = timer()
partition = c.best_partition(cg) #default louvain community detection
v = {} #create dict to group nodes by community
for key, value in sorted(partition.items()):
v.setdefault(value, []).append(key)
stop = timer()
print(stop-start)
The main function looks as such. I'm initializing 2 graphs A and B of 3000 and 1000 nodes respectively, with an average degree of 5. I'm using a Jupyter notebook to run this, so I use await main()
instead of asyncio.run
.
A = nx.barabasi_albert_graph(3000,5)
B = nx.barabasi_albert_graph(1000,5)
async def main():
task1 = asyncio.create_task(
procManager(community, A, "A"))
task2 = asyncio.create_task(
procManager(community, B, "B"))
print("async start")
await main()
What I'm trying to do is to get A and B processed asynchronously (i.e. start at the same time) but on different processes. Current outputs look like this, where A and B are processed on new processes but are blocking. I'll need to compute for A and B communities in an async manner because they'll be triggered by a rabbitMQ stream and responses need to be non-blocking.
async done
A started at 06:03:48
parent process: 5783
current process: 12121
11.424800566000158
A finished at 06:03:59
=========
B started at 06:03:59
parent process: 5783
current process: 12121
0.037437027999885686
B finished at 06:03:59
=========
Hope you guys can help!
In your case the problem is the join()
method. It blocks until the process has finished. Also, you wouldn't even need asyncio
for that. Have a look at this quick example:
import time
from multiprocessing import Process
def procManager(f,a,o):
print(f"{o} started at {time.strftime('%X')}")
p = Process(target=f, args=(a,))
p.start()
# p.join()
print(f"{o} finished at {time.strftime('%X')}") # This will occur immediately
print("=========")
def community(cg):
for i in range(10):
print("%s - %s" %(cg, i))
time.sleep(1)
procManager(community, "This is A", "A")
procManager(community, "This is B", "B")
This should give you an idea on how to solve your problem. I hope it helps!