Search code examples
pythonpython-asyncioconcurrent.futuresprocess-pool

how to manage ProcessPoolExecutor with run_in_executor


I have a FastAPI program that receives like 30-100 req/sec request content are from 1Kb-1Mb in size and I'm running a regex matching on the content, the regex is whitespace tolerant that is I can split the content into multiple chunks and run the regex matching on each chunk in a different process, but as the code is running asynchronously, I need a way to keep the API running while the processes execute. so I tried using ProcessPoolExecutor with run_in_executor from asyncio
here is the MVP code of what I did:

import asyncio
import concurrent.futures
import functools


async def executor_task(fn,executor=None ):
  event_loop = asyncio.get_event_loop()
  return await event_loop.run_in_executor(executor, fn)


def split_on_whitespace(content:str, count):
  if not content: return ['' for i in range(count)]

  length = len(content)
  part = int(length / count )
  last_beg = 0
  last_end = part
  splitted = []
  for beg,end in zip(range(part, length, part), range(part*2, length, part)):
    new_content = content[beg:end]
    last_end = re.search("[\"'\s]", new_content, )

    if not last_end :
      splitted = splitted + ['' for i in range(count - len(splitted))]
      break 

    last_end = last_end.end() + beg
    content = content[last_beg: last_end]
    splitted.append(content)
    last_beg = last_end
    last_end = end

  return splitted

def run_regex_on_content_chunk(content):
    domains = []
    domain_patt = re.compile(r'([a-zA-Z0-9\-_]+\.){1,}[a-zA-Z0-9\-_]+') # extract domain name
    for df in re.findall(domain_patt, content):
      domains.append(content[df.start(): df.end()])

    return domains


@app.post("/addContent")
async def add_content(content:dict):

  all_content = content['data']
  nworkers = 6
  content_chunks = split_on_white_space(all_content)                  #split content
  async_tasks = []
  with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
    for chunk in content_chunks:
      regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
      async_tasks.append(executor_task(regex_fn, executor))           # add to gather later 

    await asyncio.gather(*async_tasks)                                # gather 

running this code creates the processes but the API hangs altogether, when checking the processes they seem to be idle, but still the API is hangs and is not usable at all and the execution seems to never leave the with statement.

PS:the split_on_white_space runs function and run_regex_on_content_chunk don't contain loops or any blocking code of any kind


Solution

  • The problem is there is confusing the lifetime/costs of creating your resources.

    An external proccess is a "huge" thing when compared with, say, calling a function - and by calling ProcessPoolExecutor inside your view code you are creating several processes inside each view.

    So your view code expands from what would be a few milliseconds to execute to maybe several seconds. The whole idea of having a "process pool" is to have workers in several external processes pre-spawned and ready to process your data. Your code just turns it on its head, and adds a lot of boiler-plate heavy-lifting to the processing of each view, without adding anything to the processing of the data itself.

    The thing to do there is to have a single ProcessPoolExecutor instance for the lifetime of your server application. That in itself may be a bit hard to setup correctly - maybe you'd be better off working with something like "Celery" instead, which can handle failing workers, and even have workers in different machines allowing for horizontal scaling for free. But changing the code for that is another level.

    For now, focusing on your minimal example, something along this, using the lifespan parameter when starting your fastAPI instance might work. Bellow, I just changed the code in your view to reuse the same process-pool, and added the code to create a single executor pool for each server process.

    Just beware that there is another thing that may be causing your code to fail althogether (instead of just running 10000 times slower): the call to FastAPI itself must take place only on the main process - if it is not guarded, even indirectly, in a check for that (the if __name__ == "__main__": clause), when trying to create the multiprocessing pool your code may simply be setting up several FastAPI servers in a run-away chain.

    ...
    @app.post("/addContent")
    async def add_content(content:dict):
    
        all_content = content['data']
        nworkers = 6
        content_chunks = split_on_white_space(all_content)                  #split content
        async_tasks = []
        for chunk in content_chunks:
            regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
            async_tasks.append(executor_task(regex_fn, executor))           # add to gather later 
        await asyncio.gather(*async_tasks)     
            
        
    ...
    
    
    from contextlib import asynccontextmanager
    
    process_pool = None
    
    @asynccontextmanager
    async def executor_pool(app):
        global process_pool
        nworkers = 18   # These workers are shared for all views - 
                        # even though, if CPU is 100% occupation you should get no
                        # gain above the number of hardware threads you have,
                        # in theory - I think that when we factor in the
                        # idle time for networking, and such  about 3X that 
                        # number will serve you well. Of course
                        # CPU usage should be monitored in production - 
                        # as long as you don't reach close to 100% under maximum load
                        # you can further increase the number of workers.
        process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=nworkers)
        try:
            yield # at this point, fastAPI will setup the server and run your application
        finally:
            process_pool.shutdown()  # and this runs when your server is stopping!
     
    if __name__ == "__main__":
        app = FastAPI(lifespan=executor_pool)   # use this line to call "FastAPI" - the line you do this is not present in your example code.