Search code examples
pythonredisstreamaioredis

redis streams ordered processing within consumer groups


I am using python (aioredis) with redis streams.

I have a one producer - many (grouped) consumer scenario and would like to ensure that the consumers are processing the (bulk) messages sent to a stream in an ordered fashion, meaning: when first message is done, process next message in the stream and so on. This would also mean that on of the consumer in the consumer group is processing at a time whereas the other consumers would wait.

I also want to rely on an ordered processing in a second, third and so on consumer group - all relying on the same messages sent to one stream. meaning:

message 1 ... n -> stream1 
ordered processing within group 1 ... n  
whereas consumer 1 ... n per group 1 ... n

What would be a good approach to get this done when I also want to ensure that there is not much overload with a potential order checking logic per group?


Solution

  • Let me get back to old school to synchronous processing, if you want to process stream messages sequentially it's not easy, the reasons are failure/retry.

    Consider you want to process each message at max once, stream message execution is a critical section, and consumer group members as threads/processes.

    To synchronize this, you need to have some sort of locking mechanism, given consumer group could be running on different machines. You can use a global locking mechanism to prevent multiple consumers from consuming messages from the same stream.

    You can use Redis lock (RedLock) to acquire/release the lock.

    Psuedo Code

    Procedure SequentialProcessor
    
    Input: StreamName
    Input: ConsumerName
    Input: ConsumerGroup
    Input: LockTime 
    
    
    BEGIN
        redLock = RedLock()
        WHILE True DO
         IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
           message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
           TRY
             processMessage( message )
           FINALLY
              redLock.releaseLock( StreamName#ConsumerGroup )
         ENDIF
        END
    END