Search code examples
multithreadingpython-2.7falconframework

How to store response when http client closed a connection?


I'm using falcon framework in python to form json responses of web api.

For instance I have a function called logic() that works for 30-90min. I want something like this:

  1. When http-client asks for /api/somepath.json we call somepath_handle()
  2. somepath_handle() runs logic() in another thread/process
  3. When logic() is finished, thread is closed
  4. somepath_handle() reads response of logic() from return
  5. If somepath_handle() was killed before logic() was finished, then thread/etc with logic() isn't stopped until it is finished

The code:

def somepath_handle():
    run_async_logic()
    response=wait_for_async_logic_response() # read response of logic()
    return_response(response)

Solution

  • I am using a simple worker to create the queue where I am processing some commands. If add simple response storage than there will be possibility to process any requests and not loss them when connection was lost.

    Example: It's main function that used falconframework.org to response to requests.

    main.py:

    from flow import Flow
    import falcon
    import threading
    import storage
    
    __version__ = 0.1
    __author__ = '[email protected]'
    
    
    app = falcon.API(
        media_type='application/json')
    
    app.add_route('/flow', Flow())
    
    THREADS_COUNT = 1
    # adding the workers to process queue of command
    worker = storage.worker
    for _ in xrange(THREADS_COUNT):
        thread = threading.Thread(target=worker)
        thread.daemon = True
        thread.start()
    

    It's simple storage with worker code storage.py:

    from Queue import Queue
    import subprocess
    import logging
    main_queque = Queue()
    
    def worker():
        global main_roles_queque
        while True:
            try:
                cmd = main_queque.get()
                #do_work(item)
                #time.sleep(5)
                handler = subprocess.Popen(
                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
                stdout, stderr = handler.communicate()
                logging.critical("[queue_worker]: stdout:%s, stderr:%s, cmd:%s" %(stdout, stderr, cmd))
                main_queque.task_done()
            except Exception as error:
                logging.critical("[queue_worker:error] %s" %(error))
    

    It's class that will process any requests [POST, GET] flow.py:

    import storage
    import json
    import falcon
    import random
    
    class Flow(object):
    
        def on_get(self, req, resp):
            storage_value = storage.main_queque.qsize()
            msg = {"qsize": storage_value}
            resp.body = json.dumps(msg, sort_keys=True, indent=4)
            resp.status = falcon.HTTP_200
    
        #curl -H "Content-Type: application/json" -d '{}'  http://10.206.102.81:8888/flow
        def on_post(self, req, resp):
            r = random.randint(1, 10000000000000)
            cmd = 'sleep 1;echo "ss %s"' % str(r)
            storage.main_queque.put(cmd)
            storage_value = cmd
            msg = {"value": storage_value}
            resp.body = json.dumps(msg, sort_keys=True, indent=4)
            resp.status = falcon.HTTP_200