Search code examples
pythonredisstreamqueuepublish-subscribe

How do I perform an atomic XREAD and XDEL on a Redis Stream?


I am trying to design a system on Redis, where:

  • There is a queue of messages
  • There is a single writer to the queue
  • There are multiple consumers of "messages" in the queue, each continuously reading and consuming messages as they arrive
  • Each message MUST be received by one and only one consumer.

I was not able to find a way to XREAD and XDEL the message atomically. The best solution I see is:

import redis

client = redis.StrictRedis(host='localhost', port=6379, db=0)

group_name = 'my_group'
stream_name = 'my_queue'
consumer_name = 'consumer_1'  # Change this for each consumer

def read_and_process_messages():
    while True:
        # Read message from the stream
        messages = client.xreadgroup(group_name, consumer_name, {stream_name: '>'}, count=1, block=5000)
        
        for stream, messages in messages:
            for message_id, message in messages:
                print(f"Consumer {consumer_name} processing message ID: {message_id}, data: {message[b'message'].decode()}")
                
                # Acknowledge the message
                client.xack(stream_name, group_name, message_id)

                # ANOTHER CONSUMER CAN RECEIVE THE MESSAGE HERE!
                
                # Delete the message after acknowledging
                client.xdel(stream_name, message_id)

if __name__ == "__main__":
    read_and_process_messages()

But this has a problem that while an xread request was sent, another parallel consumer can receive the same message.

How can I read the first message in the stream and automatically delete it, ensuring that it is only received once? Is there an option which allows to do that, like:

XREAD COUNT 1 STREAMS my_queue 0 DEL


Solution

  • Your assertion that "ANOTHER CONSUMER CAN RECEIVE THE MESSAGE HERE!" is not correct, at least not for the code you have written. You're using consumer groups and solving this problem is exactly what they are for. Another consumer group could read that message, but not another consumer within your group. And since you're only using one consumer group, this should work.

    That said, I think there's a more fundamental problem here. And that is that you are trying to use an event stream as a queue.

    The whole idea of an event stream, at least in Redis, is that it is a log of events over time that can be consumed and reconsumed needed. If it gets too big, you trim it, but normally, you don't delete from it. It's a history that multiple services might be interested in.

    A queue, on the other hand, is a list of items that need processed once and only once. You push things into the queue on one side and pop them out on the other. You can use an event stream for this, with some effort, but Redis has a simpler data structure for that—the List.

    For what you are doing, a List is probably more appropriate. Just have whatever is producing the events push to one side of the list with either LPUSH or RPUSH and then you can pop from the other side with either LPOP or RPOP. LPOP and RPOP are atomic and remove what they return. Neat and tidy.

    There are also blocking versions of these commands, which I use in the code below. I am assuming that pushes are on the left and pops are on the right.

    import redis
    
    client = redis.StrictRedis(host='localhost', port=6379, db=0)
    
    list_name = 'my_list'
    
    def read_and_process_messages():
        while True:
            # Read message from the list
            messages = client.brpop( [list_name], timeout=5000)
            
            for message in messages:
                # Do stuff with the message
    

    I didn't try running this code but I'm reasonably sure that it is correct. More importantly, I hope it helps!