Search code examples
python-3.xredisasync-awaittornadopython-asyncio

Tornado + aioredis: why are my redis calls blocking?


I try to build on Tornado and with Redis a simple system with two API endpoints:

  1. an API reading a value from Redis, or waiting until this value exists (with BRPOP : value = yield from redis.brpop("test"))
  2. an API writing this value (with LPUSH : redis.lpush("test", "the value")).

So I expect to be able to call those API in any order. Indeed, If I call 2. then 1., it works as expected, the call to 1. returns immediately with the value.

The problem is, if I call 1. then 2., both requests block to never return.

Concurrently, while the requests block, I can still LPUSH/BRPOP directly in Redis, even to the same key. Similarly, I can call other Handlers in Tornado. So I guess the block is situated neither in Redis nor in Tornado, but in my use of aioredis? Maybe the asyncio loop? But I can't understand where I'm mistaken. Any tip?

Thanks for any help.

Here is my code:

import tornado.ioloop
import tornado.web
from tornado import web, gen
from tornado.options import options, define
import aioredis
import asyncio


class WaitValueHandler(tornado.web.RequestHandler):
    @asyncio.coroutine
    def get(self):
        redis = self.application.redis
        value = yield from redis.brpop("test")
        self.write("I received a value: %s" % value)


class WriteValueHandler(tornado.web.RequestHandler):
    @asyncio.coroutine
    def get(self):
        redis = self.application.redis
        res = yield from redis.lpush("test", "here is the value")
        self.write("Ok ")


class Application(tornado.web.Application):
    def __init__(self):
        tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOMainLoop')

        handlers = [
            (r"/get", WaitValueHandler),
            (r"/put", WriteValueHandler)
        ]

        super().__init__(handlers, debug=True)

    def init_with_loop(self, loop):
        self.redis = loop.run_until_complete(
            aioredis.create_redis(('localhost', 6379), loop=loop)
        )

if __name__ == "__main__":
    application = Application()
    application.listen(8888)

    loop = asyncio.get_event_loop()
    application.init_with_loop(loop)
    loop.run_forever()

Solution

  • Ok I saw why, as the doc states :

    Blocking operations (like blpop, brpop or long-running LUA scripts) in shared mode mode will block connection and thus may lead to whole program malfunction.

    This blocking issue can be easily solved by using exclusive connection for such operations:

    redis = await aioredis.create_redis_pool(
        ('localhost', 6379),
        minsize=1,
        maxsize=1)
    
    async def task():
       # Exclusive mode
       with await redis as r:
           await r.set('key', 'val')
    asyncio.ensure_future(task())
    asyncio.ensure_future(task())
    # Both tasks will first acquire connection.