Search code examples
pythonhttpasynchronoustornadorequest-cancelling

How to let long tasks be "cancellable" via HTTP on Tornado HTTP servers?


I've implemented an HTTP wrapper of some kind of heavy task, and I chose Tornado as a front-end server framework (it's because the heavy task is written in Python, and I'm just used to Tornado).

Currently, I just called heavy tasks directly from Tornado's process. I prepared some kind of Web-based interface using jQuery, let it proceed AJAX request with parameters set in form.

As you may imagine, the task I've thrown from my web-browser isn't cancellable. The only way I can cancel is to send 9 or 15 signal to the Python process, and that's not what users can usually do.

I want to let the currently-working-task to be cancelled by requesting some kind of "cancel" requests via HTTP. How can it be done? What are most web services that handle heavy tasks (ex. video encoding in YouTube) doing?


Solution

  • Actually Tornado's Futures do not support cancellation (docs). Moreover even using with_timeout, the time-outed job is still running, only nothing waits for its results.

    The only way, as stated also in How can I cancel a hanging asyncronous task in tornado, with a timeout?, is to implement logic in that way it could be cancelled (with some flag or whatever).

    Example:

    • job is a simple async sleep
    • / lists jobs
    • /add/TIME adds new job - TIME in seconds - specify how long to sleep
    • /cancel/ID cancel job

    The code might look like:

    from tornado.ioloop import IOLoop
    from tornado import gen, web
    from time import time
    
    class Job():
    
        def __init__(self, run_sec):
            self.run_sec = int(run_sec)
            self.start_time = None
            self.end_time = None
            self._cancelled = False
    
        @gen.coroutine
        def run(self):
            """ Some job
    
            The job is simple: sleep for a given number of seconds.
            It could be implemented as:
                 yield gen.sleep(self.run_sec)
            but this way makes it not cancellable, so
            it is divided: run 1s sleep, run_sec times 
            """
            self.start_time = time()
            deadline = self.start_time + self.run_sec
            while not self._cancelled:
                yield gen.sleep(1)
                if time() >= deadline:
                    break
            self.end_time = time()
    
        def cancel(self):
        """ Cancels job
    
        Returns None on success,
        raises Exception on error:
          if job is already cancelled or done
        """
            if self._cancelled:
                raise Exception('Job is already cancelled')
            if self.end_time is not None:
                raise Exception('Job is already done')
            self._cancelled = True
    
        def get_state(self):
            if self._cancelled:
                if self.end_time is None:
                    # job might be running still
                    # and will be stopped on the next while check
                    return 'CANCELING...'
                else:
                    return 'CANCELLED'
            elif self.end_time is None:
                return 'RUNNING...'
            elif self.start_time is None:
                # actually this never will shown
                # as after creation, job is immediately started
                return 'NOT STARTED'
            else:
                return 'DONE'
    
    
    class MainHandler(web.RequestHandler):
    
        def get(self, op=None, param=None):
            if op == 'add':
                # add new job
                new_job = Job(run_sec=param)
                self.application.jobs.append(new_job)
                new_job.run()
                self.write('Job added')
            elif op == 'cancel':
                # cancel job - stop running
                self.application.jobs[int(param)].cancel()
                self.write('Job cancelled')
            else:
                # list jobs
                self.write('<pre>') # this is so ugly... ;P
                self.write('ID\tRUNSEC\tSTART_TIME\tSTATE\tEND_TIME\n')
                for idx, job in enumerate(self.application.jobs):
                    self.write('%s\t%s\t%s\t%s\t%s\n' % (
                        idx, job.run_sec, job.start_time,
                        job.get_state(), job.end_time
                    ))
    
    
    class MyApplication(web.Application):
    
        def __init__(self):
    
            # to store tasks
            self.jobs = []
    
            super(MyApplication, self).__init__([
                (r"/", MainHandler),
                (r"/(add)/(\d*)", MainHandler),
                (r"/(cancel)/(\d*)", MainHandler),
            ])
    
    if __name__ == "__main__":
        MyApplication().listen(8888)
        IOLoop.current().start()
    

    Add couple jobs:

    for a in `seq 12 120`; do curl http://127.0.0.1:8888/add/$a; done
    

    Then cancel some... Note - it requires only Tornado.

    This example is very simple the gen.sleep is meant to be your heavy task. Of course not all jobs are as simple as that to implement in cancel-able way.