Could anyone be so kind and let me know how to run long running tasks in parallel? here is my solution, with sequential solution:
https://gist.github.com/dzizes/4c23ba4c2cd7bfbeff643c0cb85749c7
You need to start multiple workers. This line starts one worker coroutine:
io_loop.run_sync(mongoQueue.worker)
Instead, do something like:
@gen.coroutine
def workers(self):
futures = [self.worker() for _ in range(CONCURRENT)]
yield futures
Then:
io_loop.run_sync(mongoQueue.workers)
I'd move the job of filling the queue out of the workers and put it in producers instead:
@gen.coroutine
def producer(self):
while self.running:
yield self.load_work()
This more closely resembles the standard consumer-producer code in the Tornado docs. Then, update workers
so it spawns a producer:
@gen.coroutine
def workers(self):
IOLoop.current().spawn_callback(self.producer)
futures = [self.worker() for _ in range(CONCURRENT)]
yield futures
See the docs for why we use spawn_callback to spawn a coroutine. You'll need to set self.running True at the start, and decide when to end your producer by setting self.running False. You also need to decide how to end the worker coroutines.
A couple other corrections to your code. First, don't confuse yourself by naming a coroutine with the value _thread
: your code is single-threaded and it achieves concurrency using coroutines. Second, you don't need gen.sleep
, the max queue size will throttle the producer.