I'm trying to write a simple workload generator for a Tornado server, here's its simplified version:
class EventsLoader(object):
generate_num_requests = 1000
generate_concurrency = 32
server_port = 8001
def __init__(self, conf_file):
self.parse_config(conf_file)
self.client = AsyncHTTPClient()
def generate(self):
IOLoop.current().run_sync(self.generate_work)
@gen.coroutine
def generate_work(self):
self.queue = queues.Queue()
IOLoop.current().spawn_callback(self.fetch_requests)
for i in range(self.generate_concurrency):
yield self.generate_requests(i)
print 'before join queue size: %s' % self.queue.qsize()
yield self.queue.join()
@gen.coroutine
def generate_requests(self, i):
load = self.generate_num_requests / self.generate_concurrency
for j in range(load):
request = self.generate_request(i * 1000 + j)
self.queue.put(request)
@gen.coroutine
def fetch_requests(self):
while True:
try:
request = yield self.queue.get()
yield self.client.fetch(request)
except Exception as e:
print 'failed fetching: %s: %s' % (request.body, e)
finally:
print 'fetched: %s' % json.loads(request.body)['seq']
self.queue.task_done()
def generate_request(self, seq):
event = {
'seq': seq,
# ... more fields here ...
}
return HTTPRequest(
'http://localhost:%s/events' % self.server_port,
method='POST',
body=json.dumps(event),
)
What I see happen is that all messages fetched: xxxx
appear in sequential order, which is absolutely improbable, if the generator was indeed working concurrently.
How do I make it run concurrently? There must be something huge missing in my understanding of what I/O loop is for, and what @gen.coroutine
does. I.e. regardless of my generate_concurrency
setting, the performance is unchanged.
No matter how you're generating the requests, you've only got on task fetching them. It's the fetching, not the generating, that you need to parallelize:
for i in range(self.fetch_concurrency):
IOLoop.current().spawn_callback(self.fetch_requests)
This will give you multiple fetch_requests
workers that can pull work from the shared queue.
Also, the generation part of this code isn't running in parallel either. Instead of
for i in range(self.generate_concurrency):
yield self.generate_requests(i)
which waits for one generate_requests
call to finish before starting the next, you can run them in parallel with
yield [self.generate_requests(i) for i in range(self.generate_concurrency)]