Search code examples
pythonasynchronousmultiprocessingpython-multiprocessingpython-asyncio

Using Asyncio to create new Python Processes


I'm setting up a function to asynchronously start a new process to run a very cpu heavy function. Most of the documentation don't cover this thoroughly, and what I've pieced together doesn't seem to asynchronously work.

I have a function procManager which takes in a function, the args to pass into the function, and an object name for basic logging.

async def procManager(f,a,o):
    print(f"{o} started at {time.strftime('%X')}")
    p = Process(target=f, args=(a,))
    p_parent = os.getppid()   # parent process
    p_curr = os.getpid()     # current process
    print("parent process:", p_parent)
    print("current process:", p_curr)
    p.start()
    p.join()
    print(f"{o} finished at {time.strftime('%X')}")
    print("=========")

I have this cpu heavy function that runs Louvain's community detection on a networkX graph that I pass into def procManager to spawn on a new process.

def community(cg):
    start = timer()
    partition = c.best_partition(cg) #default louvain community detection
    v = {} #create dict to group nodes by community
    for key, value in sorted(partition.items()):
        v.setdefault(value, []).append(key)
    stop = timer()
    print(stop-start)

The main function looks as such. I'm initializing 2 graphs A and B of 3000 and 1000 nodes respectively, with an average degree of 5. I'm using a Jupyter notebook to run this, so I use await main() instead of asyncio.run.

A = nx.barabasi_albert_graph(3000,5)  
B = nx.barabasi_albert_graph(1000,5)  

async def main():
    task1 = asyncio.create_task(
        procManager(community, A, "A"))

    task2 = asyncio.create_task(
        procManager(community, B, "B"))

    print("async start")

await main()

What I'm trying to do is to get A and B processed asynchronously (i.e. start at the same time) but on different processes. Current outputs look like this, where A and B are processed on new processes but are blocking. I'll need to compute for A and B communities in an async manner because they'll be triggered by a rabbitMQ stream and responses need to be non-blocking.

async done
A started at 06:03:48
parent process: 5783
current process: 12121
11.424800566000158
A finished at 06:03:59
=========
B started at 06:03:59
parent process: 5783
current process: 12121
0.037437027999885686
B finished at 06:03:59
=========

Hope you guys can help!


Solution

  • In your case the problem is the join() method. It blocks until the process has finished. Also, you wouldn't even need asyncio for that. Have a look at this quick example:

    import time
    from multiprocessing import Process
    
    def procManager(f,a,o):
        print(f"{o} started at {time.strftime('%X')}")
        p = Process(target=f, args=(a,))
        p.start()
        # p.join()
        print(f"{o} finished at {time.strftime('%X')}") # This will occur immediately
        print("=========")
    
    def community(cg):
        for i in range(10):
            print("%s - %s" %(cg, i))
            time.sleep(1)
    
    procManager(community, "This is A", "A")
    procManager(community, "This is B", "B")
    

    This should give you an idea on how to solve your problem. I hope it helps!