Search code examples
asynchronousredisstreamkeyconsumer

how to use redis stream to spread message to consumer with keys


lets say we have 3 consumers (C1, C2, C3) listening to a redis stream and we have a single consumer group which these 3 consumers belong to.

besides message unique id they can also hold a custom key value, the same key could potentially appear in different messages. the values of key item is not known in advanced.

I want to make sure that:

  1. once message M1 hits the stream, it gets picked randomly by only one consumer (not fanning out to all consumers)
  2. however multiple consumers can consume messages with different keys simultaneously
  3. once consumer C1 gets message M1 with key K1 no other consumer can consume other new messages with key K1, so basically C1 can process other related messages with K1 (to avoid overring values on processed data)
     STREAM TIMELINE                        CONSUMERS
     T1--------T2--------T3--------T4    /===> [C1]
[ (M1:K1) | (M2:K2) | (M3:K1) | (M4:K3) ] ===> [C2]
                                         \===> [C3]
CONSUMPTION TIMELINE
T1| @(M1:K1)=>[C1]==================|done|
T2|  @(M2:K2)=>[C2]===|done|
T3|   @       (waiting)             (M3:K1)>[C1]=|done|
T4|    @(M4:K3)=====>[C3]===|done|

also process order is not important here, the only important thing here is that not two consumers should process messages with the same id at the same time!

is it possible to achieve the above goals with redis streams (or mabye other means)?


Solution

  • To do this requires some glue because a message's payload is opaque to Redis (i.e. messages have no "key").

    The following is a rough back-of-the-napkin idea for a given message M and a key K.

    Assume HK is a Redis Hash (e.g. 'consumers_keys'), SK is a Stream ('keys_stream'), and SCn are streams per consumer ('C1'). Also, assume having the following workflows implemented via server-side scripts/functions for atomicity.

    Publisher workflow:

    1. Check whether K is in HK (HGET conusmer_keys K1)
    2. If it doesn't exist, publish M to SK (XADD SK * M)
    3. If it does, the reply is SCn, so publish M to SCn (XADD SCn * M)

    Consumer workflow:

    1. Consume SK as Cn in the consumer group. For each new message:
      1. Set K as SCn in H (HSET HK K SCn)
      2. Process M
    2. Consume SCn normally

    Because the messages are now partitioned between SK and SCn, reclaiming them is more challenging and is left as an exercise to the reader.