Search code examples
pythonasynchronousparallel-processingqueuetornado

python tornado queue toro task consuming in parallel


Could anyone be so kind and let me know how to run long running tasks in parallel? here is my solution, with sequential solution:

https://gist.github.com/dzizes/4c23ba4c2cd7bfbeff643c0cb85749c7


Solution

  • You need to start multiple workers. This line starts one worker coroutine:

    io_loop.run_sync(mongoQueue.worker)
    

    Instead, do something like:

    @gen.coroutine
    def workers(self):
        futures = [self.worker() for _ in range(CONCURRENT)]
        yield futures            
    

    Then:

    io_loop.run_sync(mongoQueue.workers)
    

    I'd move the job of filling the queue out of the workers and put it in producers instead:

    @gen.coroutine
    def producer(self):
        while self.running:
            yield self.load_work()
    

    This more closely resembles the standard consumer-producer code in the Tornado docs. Then, update workers so it spawns a producer:

    @gen.coroutine
    def workers(self):
        IOLoop.current().spawn_callback(self.producer)
        futures = [self.worker() for _ in range(CONCURRENT)]
        yield futures       
    

    See the docs for why we use spawn_callback to spawn a coroutine. You'll need to set self.running True at the start, and decide when to end your producer by setting self.running False. You also need to decide how to end the worker coroutines.

    A couple other corrections to your code. First, don't confuse yourself by naming a coroutine with the value _thread: your code is single-threaded and it achieves concurrency using coroutines. Second, you don't need gen.sleep, the max queue size will throttle the producer.