Search code examples
pythonredisluaqueue

Store local intermediate result of 'rpop' in redis lua script


        # Check processing queue for any previously unprocessed items.
        # If previously unprocessed item
        # Check if item key is not expired and push back to processing queue
        # Otherwise push back onto work queue
        # (May want to r_push if we need immediate processing)
        lua_script = f"""
            local saved_item = redis.call('rpop', 'processing_list')

            if (saved_item ~= nil) then
                if (redis.call('exists', 'processing_list' .. _ .. saved_item) == 1) then
                    redis.call('lpush', 'processing_list', saved_item)
                else
                    redis.call('lpush', 'work_list', saved_item)
                end
            end
        """
        self._queue_client.get_redis_client().eval(lua_script, 0)

Above I am attempting to implement a durable queue using redis. I need this portion of the logic to be atomic but seeing as redis transactions don't allow intermediate reads and writes I had to resort to lua. The problem is that the initial 'rpop' line does not work. I verified in my redis-cli that it returns (nil) and therefore the 'saved_item' variable is never set correctly. Is there a better approach to accomplish that intermediate read and then conditional logic using that value?


Solution

  • As a newcomer to lua, I had to hack at this till it worked and still haven't figured out why without combing through lua docs. The key was to create a local function to return the rpop item from that and store it in a local variable rather than try directly. Something to do with Lua return values.

            lua_script = f"""
            if (redis.call('exists', '{self._processing_queue_name}') == 1) then
                local rpop = function (list) return redis.call('rpop', list) end
                local saved_item = rpop('{self._processing_queue_name}')
    
                if (saved_item ~= nil) then
                    if (redis.call('exists', '{self._processing_queue_name}' .. '_' .. saved_item) == 1) then
                        redis.call('lpush', '{self._processing_queue_name}', saved_item)
                    else
                        redis.call('lpush', '{self._queue_name}', saved_item)
                    end
                end
            end
        """