Search code examples
apache-kafkaapache-kafka-streamsrace-condition

Does this particular Kafka Streams topology introduce a race condition?


There are two topics involved: commands that is a KStream of - you won't believe! - commands, and state, which is KTABLE (just regular one, not a GlobalKTable).

Topology looks like that: commands.leftJoin(state, computeNewState).to(state) i.e. command acts on a current state and produces new one to the same topic. It's kinda command X state -> state in term of functional programmig; where final state is produced to the same place, where initial state was taken from.

It seems for me that the classical race condition is hidden there; since two (almost) simultaneous commands might produce following unfortunate sequence:

  1. command_1 arrives and consumes state_1;
  2. after recomputation, state_2 is produced by applying command_1;
  3. state_2 reactes to node and effectively async IO to Kafka happens...
  4. ...yet it is not fast enough to be applied; meanwhile command_2 comes with the same key such that leftJoin acts on the state_1 rather then state_2 simply because state_2 is not yet delivered to Kafka and not yet seen by Kafka Streams instance;
  5. QED.

Am I right?


Solution

  • What you describe is correct.

    Maybe you can just use a single input topic and use an aggregation instead to modify the state? For this case, the update to the state would be synchronous.

    If that is not possible, I would recommend to fall back to the Processor API. You read the state topic into a manually added state store. You also connect the state store to the processor that processes the comment topic -- this way, the processes can read and modify the state when processing a command directly -- writing anything back into the state input topic would be be required.