Search code examples
pythonmultithreadinghttpsubprocessaiohttp

HTTP server kick-off background python script without blocking


I'd like to be able to trigger a long-running python script via a web request, in bare-bones fashion. Also, I'd like to be able to trigger other copies of the script with different parameters while initial copies are still running.

I've looked at flask, aiohttp, and queueing possibilities. Flask and aiohttp seem to have the least overhead to set up. I plan on executing the existing python script via subprocess.run (however, I did consider refactoring the script into libraries that could be used in the web response function).

With aiohttp, I'm trying something like: ingestion_service.py:

from aiohttp import web
from pprint import pprint
routes = web.RouteTableDef()

@routes.get("/ingest_pipeline")
async def test_ingest_pipeline(request):
    '''
    Get the job_conf specified from the request and activate the script
    '''
    #subprocess.run the command with lookup of job conf file
    response =  web.Response(text=f"Received data ingestion request")
    await response.prepare(request)
    await response.write_eof()

    #eventually this would be subprocess.run call
    time.sleep(80)

    return response

def init_func(argv):
    app = web.Application()
    app.add_routes(routes)
    return app

But though the initial request returns immediately, subsequent requests block until the initial request is complete. I'm running a server via:

python -m aiohttp.web -H localhost -P 8080 ingestion_service:init_func

I know that multithreading and concurrency may provide better solutions than asyncio. In this case, I'm not looking for a robust solution, just something that will allow me to run multiple scripts at once via http request, ideally with minimal memory costs.


Solution

  • OK, there were a couple of issues with what I was doing. Namely, time.sleep() is blocking, so asyncio.sleep() should be used. However, since I'm interested in spawning a subprocess, I can use asyncio.subprocess to do that in a non-blocking fashion. nb: asyncio: run one function threaded with multiple requests from websocket clients https://docs.python.org/3/library/asyncio-subprocess.html.

    Using these help, but there's still an issue with the webhandler terminating the subprocess. Luckily, there's a solution here: https://docs.aiohttp.org/en/stable/web_advanced.html

    aiojobs has a decorator "atomic" that will protect the process until it is complete. So, code along these lines will function:

    from aiojobs.aiohttp import setup, atomic
    import asyncio
    import os
    
    from aiohttp import web
    
    @atomic
    async def ingest_pipeline(request):
        #be careful what you pass through to shell, lest you
        #give away the keys to the kingdom
        shell_command = "[your command here]"
        response_text = f"running {shell_command}"
        response_code = 200
        response =  web.Response(text=response_text, status=response_code)
        await response.prepare(request)
        await response.write_eof()
    
        ingestion_process = await asyncio.create_subprocess_shell(shell_command,
                                                                  stdout=asyncio.subprocess.PIPE,
                                                                  stderr=asyncio.subprocess.PIPE)
    
        stdout, stderr = await ingestion_process.communicate()
        return response
    
    def init_func(argv):
        app = web.Application()
        setup(app)
        app.router.add_get('/ingest_pipeline', ingest_pipeline)
        return app
    

    This is very bare bones, but might help others looking for a quick skeleton for a temporary internal solution.