Search code examples
python-3.xasynchronouspython-asynciopython-multiprocessingfastapi

How do I call an async function in a new process using multiprocessing?


I created a server that wait for webhook signal, and when there is signal, it will create a new process to run the loop() function, and when running the loop() function, I want it to call the function printmessage() asynchronously, so it will run the next line in the loop function without waiting the printmessage() function finish processing, but I got the following errors, how I resolve it?

#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio


async def printmessage(fruit):
    print(fruit)
    time.sleep(5) 
    
async def loop(fruit):
    while True:     
        task = asyncio.create_task(printmessage(fruit))          
        time.sleep(1)

fruit="apple"
if __name__ == '__main__':

    print("PROGRAM LAUNCH...")
    print("WEBHOOK RECEIVE READY...")   

 
app = FastAPI()    
@app.post("/webhook")
async def webhook(request : Request):       

    print("WEBHOOK RECEIVED")    
    p = mp.Process(target=loop,args=[fruit])
    p.start() 
    print('done')   
    return 'WEBHOOK RECEIVED' 

The intended output should be printing apple every 1 second.

ERRORS:

RuntimeWarning: coroutine 'loop' was never awaited
  self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I tried the following way to avoid the errors but there is no output at all:

#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio


async def printmessage(fruit):
    print(fruit)
    time.sleep(5) 
    
async def loop(fruit):
    while True:     
        task = asyncio.create_task(printmessage(fruit))          
        time.sleep(1)

def preloop(fruit):
    asyncio.run(loop(fruit))

fruit="apple"
if __name__ == '__main__':

    print("PROGRAM LAUNCH...")
    print("WEBHOOK RECEIVE READY...")  

 
app = FastAPI()    
@app.post("/webhook")
async def webhook(request : Request):       

    print("WEBHOOK RECEIVED")    
    p = mp.Process(target=preloop,args=[fruit])
    p.start() 
    print('done')   
    return 'WEBHOOK RECEIVED' 

Solution

  • Here is how you can call an async function in a new process using multiprocessing. In this code, each request to /webhook creates a new process, which prints apple every 5 seconds.

    from __future__ import annotations
    import asyncio
    from multiprocessing import Process
    from fastapi import FastAPI
    
    app = FastAPI()
    
    process_pool: list[Process] = []
    
    
    async def print_message(fruit):
        print(fruit)
    
    
    async def loop(fruit):
        while True:
            await print_message(fruit)
            await asyncio.sleep(5)
    
    
    def run_loop(fruit):
        asyncio.run(loop(fruit))
    
    
    @app.get("/webhook")
    async def webhook():
        print("WEBHOOK RECEIVED")
        fruit = "apple"
        process = Process(target=run_loop, args=(fruit,))
        process_pool.append(process)
        process.start()
        print('done')
        return 'WEBHOOK RECEIVED'
    
    
    @app.on_event("shutdown")
    async def shutdown_event():
        for process in process_pool:
            process.kill()
        for process in process_pool:
            while process.is_alive():
                continue
            process.close()
    
    
    if __name__ == '__main__':
        print("PROGRAM LAUNCH...")
        print("WEBHOOK RECEIVE READY...")