Search code examples
pythonconcurrencytornado

How to make tornado execute concurrent code?


I'm trying to write a simple workload generator for a Tornado server, here's its simplified version:

class EventsLoader(object):

    generate_num_requests = 1000
    generate_concurrency = 32
    server_port = 8001

    def __init__(self, conf_file):
        self.parse_config(conf_file)
        self.client = AsyncHTTPClient()

    def generate(self):
        IOLoop.current().run_sync(self.generate_work)

    @gen.coroutine
    def generate_work(self):
        self.queue = queues.Queue()
        IOLoop.current().spawn_callback(self.fetch_requests)
        for i in range(self.generate_concurrency):
            yield self.generate_requests(i)
        print 'before join queue size: %s' % self.queue.qsize()
        yield self.queue.join()

    @gen.coroutine
    def generate_requests(self, i):
        load = self.generate_num_requests / self.generate_concurrency
        for j in range(load):
            request = self.generate_request(i * 1000 + j)
            self.queue.put(request)

    @gen.coroutine
    def fetch_requests(self):
        while True:
            try:
                request = yield self.queue.get()
                yield self.client.fetch(request)
            except Exception as e:
                print 'failed fetching: %s: %s' % (request.body, e)
            finally:
                print 'fetched: %s' % json.loads(request.body)['seq']
                self.queue.task_done()

    def generate_request(self, seq):
        event = {
            'seq': seq,
            # ... more fields here ...
        }
        return HTTPRequest(
            'http://localhost:%s/events' % self.server_port,
            method='POST',
            body=json.dumps(event),
        )

What I see happen is that all messages fetched: xxxx appear in sequential order, which is absolutely improbable, if the generator was indeed working concurrently.

How do I make it run concurrently? There must be something huge missing in my understanding of what I/O loop is for, and what @gen.coroutine does. I.e. regardless of my generate_concurrency setting, the performance is unchanged.


Solution

  • No matter how you're generating the requests, you've only got on task fetching them. It's the fetching, not the generating, that you need to parallelize:

    for i in range(self.fetch_concurrency):
        IOLoop.current().spawn_callback(self.fetch_requests)
    

    This will give you multiple fetch_requests workers that can pull work from the shared queue.

    Also, the generation part of this code isn't running in parallel either. Instead of

       for i in range(self.generate_concurrency):
            yield self.generate_requests(i)
    

    which waits for one generate_requests call to finish before starting the next, you can run them in parallel with

    yield [self.generate_requests(i) for i in range(self.generate_concurrency)]