Search code examples
pythonasynchronousredistornado

tornado-redis: RPOP works but BRPOP doesn't?


New to Tornado, and Redis, and implementing the beginnings of a listener / worker setup.

I want to be able to LPUSH tasks onto a queue and RPOP them off. BRPOP seems like the best way to pop them off, as it will wait for one to be added if none are currently there. The problem is, whenever I use it, it never returns... however when I use RPOP I get the next item in the queue as expected.

class ListenHandler(tornado.websocket.WebSocketHandler):
    uid = ''
    CHANNEL_TPL = "client_%s"
    RESPONSE_TPL = '{"command":"%s","rid":"%s","status":"%s","result":%s}'
    def open(self):
        # new websocket connection is established from a client
        print "open socket"
        self.uid = session = uuid4()        

    def on_message(self, message):
        print "on_message called [%s]" % message
        try:
            m = json.loads(message)
        except ValueError:
            self.write_message('BAD')
            return

        # check for RID (request id)
        if not 'rid' in m:
            self.write_message('error: unspecified rid')
            return

        # confirm receipt of data
        confirm_string = '%s OK' % (m['rid'])
        self.write_message(confirm_string)

        # check for command
        if not 'command' in m:
            response = '%s error: unspecified command' % (m['rid'])
            self.write_message(response)
            return

        # process commands
        if m['command'] == 'register':
            self._register(m['rid'])
        elif m['command'] == 'get_canvas':
            self._queue_command('read', m)
        elif m['command'] == 'save_canvas':
            if 'data' in m:
                self._queue_command('write', m)
            else:
                response = '%s unspecified data' % (m['rid'])
                self.write_message(response)
                return
        elif m['command'] == 'list_command_queue':
            self._list_command_queue(m['rid'])
        elif m['command'] == 'get_read_job':
            self._get_read_job(m['rid'])
        elif m['command'] == 'get_write_job':
            self._get_write_job(m['rid'])
        else:
            # no commands recognized
            response = '%s error: unknown command' % (m['rid'])
            self.write_message(response)
            return
        print "end of on_message()\n"

    def callback(self, data):
        print "- callback()"
        self.write_message(data)

    def on_close(self):
        # websocket connection is closed by client
        pass

    def _register(self, rid):
        data = '{"uid":"%s"}' % (self.uid)
        response = self.RESPONSE_TPL % ('register', rid, 'completed', data)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine 
    def _queue_command(self, type, m):
        channel = self.CHANNEL_TPL % (type)
        print "pushing job to %s ... data[%s]" % (channel, m)
        yield tornado.gen.Task(self.application.rdb.lpush, channel, m)
        return

    def _list_command_queue(self, rid):
        channel_r = self.CHANNEL_TPL  % ('read')
        channel_w = self.CHANNEL_TPL  % ('write')
        data = '{"client_read":"%s","client_write":"%s"}' % (self.application.rdb.llen(channel_r), self.application.rdb.llen(channel_w))
        response = self.RESPONSE_TPL % ('list_command_queue', rid, 'completed', data)
        print "list_command_queue [%s]" % (response)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine  
    def _get_read_job(self, rid):
        channel = self.CHANNEL_TPL % ('read')
        data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
        response = self.RESPONSE_TPL % ('get_read_job', rid, 'completed', data)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine    
    def _get_write_job(self, rid):
        channel = self.CHANNEL_TPL  % ('write')
        data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
        response = self.RESPONSE_TPL % ('get_write_job', rid, 'completed', data)
        self.callback(response) 

The above class will take recognized commands and LPUSH them into one of two different queues (a 'write' queue for jobs which will write to an SQL DB, and a 'read' queue for jobs that are read-only). Theoretically there will be a number of "worker" threads on a different machine using BRPOP to get these commands and execute them. For now, though, I'm using the same listener to test whats in the queue.

The functions get_write_jobs and get_read_jobs will return the next entry in the queue, no problems. However if I ad a 'b' (brpop) to either method, the function never gets to call the callback. Seems to just lock, waiting for the next available entry, but there are entries in there.

Any idea what's going on here? Am I misunderstanding the purpose of BRPOP?

Thanks, Nick


Solution

  • Maybe it's a bit late but I think I can help.

    I've had this same problem today and it almost drives me crazy. At the end, after lots of debugging I've solved it by passing the key or channel to blpop/brpop inside a list (e.g. [channel]).

    What tornado-redis' brpop/blpop do internally is to convert the keys to a list, but when only one key is received it will convert the string to a list of chars (simply amazing...), that's why the call blocks afterwards, it is waiting for new items in various lists which name corresponds to all the characters of the original key.