When I receive a message from a client I want to run multiple RethinkDB queries in parallel and send the results the client immediately.
The blocking way is below. A count can take minutes. I would like the other queries that would return faster to not be held up by the count query.
self.write_message({'count': r.db('public').table(message['table']).count().run(conn)})
self.write_message({'rows': r.db('public').table(message['table']).limit(10).run(conn)})
I suspect I need a combination of https://rethinkdb.com/blog/async-drivers/ and http://www.tornadoweb.org/en/stable/guide/async.html
I'm thinking maybe the answer is to make those two lines something like:
ioloop.IOLoop.current().add_callback(run_query, r.db('public').table(message['table']).count(), 'count', self)
ioloop.IOLoop.current().add_callback(run_query, r.db('public').table(message['table']).limit(10), 'rows', self)
My run-query would be:
@gen.coroutine
def run_query(query, key, ws):
conn = yield r.connect(host="localhost", port=28015)
results = yield query.run(conn)
ws.write_message({key: results})
tornado.gen
doc reveals solution:
You can also yield a list or dict of Futures, which will be started at the same time and run in parallel; a list or dict of results will be returned when they are all finished.
# do not forget about this
r.set_loop_type("tornado")
@gen.coroutine
def run_parallel(query, key, ws):
conn = yield r.connect(host="localhost", port=28015)
ret = yield {
'count': r.db('public').table(message['table']).count().run(conn),
'rows': r.db('public').table(message['table']).limit(10).run(conn)
}
ws.write_message(ret)
Yielding list or dict directly has important behaviour - if any of Futures has failed, yield
will return immediately and will re-raise exception regardless of whether the other Futures are completed. To bypass it you can instead use Mulit or multi_future.
Note: I really don't know if RethinkDB requires separate connections, but I want to show concept.