Search code examples
pythontornadorethinkdbrethinkdb-python

Nonblocking fetching of data with Tornado websockets and RethinkDB


When I receive a message from a client I want to run multiple RethinkDB queries in parallel and send the results the client immediately.

The blocking way is below. A count can take minutes. I would like the other queries that would return faster to not be held up by the count query.

self.write_message({'count': r.db('public').table(message['table']).count().run(conn)})
self.write_message({'rows': r.db('public').table(message['table']).limit(10).run(conn)})

I suspect I need a combination of https://rethinkdb.com/blog/async-drivers/ and http://www.tornadoweb.org/en/stable/guide/async.html

I'm thinking maybe the answer is to make those two lines something like:

ioloop.IOLoop.current().add_callback(run_query, r.db('public').table(message['table']).count(), 'count', self)
ioloop.IOLoop.current().add_callback(run_query, r.db('public').table(message['table']).limit(10), 'rows', self)

My run-query would be:

@gen.coroutine
def run_query(query, key, ws):
    conn = yield r.connect(host="localhost", port=28015)
    results = yield query.run(conn)
    ws.write_message({key: results})

Solution

  • tornado.gen doc reveals solution:

    You can also yield a list or dict of Futures, which will be started at the same time and run in parallel; a list or dict of results will be returned when they are all finished.

    # do not forget about this
    r.set_loop_type("tornado")
    
    @gen.coroutine
    def run_parallel(query, key, ws):
        conn = yield r.connect(host="localhost", port=28015)
        ret = yield {
            'count': r.db('public').table(message['table']).count().run(conn),
            'rows': r.db('public').table(message['table']).limit(10).run(conn)
        }
        ws.write_message(ret)
    

    Yielding list or dict directly has important behaviour - if any of Futures has failed, yield will return immediately and will re-raise exception regardless of whether the other Futures are completed. To bypass it you can instead use Mulit or multi_future.

    Note: I really don't know if RethinkDB requires separate connections, but I want to show concept.