There are two topics involved: commands
that is a KStream of - you won't believe! - commands, and state
, which is KTABLE (just regular one, not a GlobalKTable).
Topology looks like that:
commands.leftJoin(state, computeNewState).to(state)
i.e. command acts on a current state and produces new one to the same topic. It's kinda command X state -> state
in term of functional programmig; where final state
is produced to the same place, where initial state was taken from.
It seems for me that the classical race condition is hidden there; since two (almost) simultaneous commands might produce following unfortunate sequence:
command_1
arrives and consumes state_1
;state_2
is produced by applying command_1
;state_2
reactes to
node and effectively async IO to Kafka happens...command_2
comes with the same key such that leftJoin
acts on the state_1
rather then state_2
simply because state_2
is not yet delivered to Kafka and not yet seen by Kafka Streams instance;Am I right?
What you describe is correct.
Maybe you can just use a single input topic and use an aggregation instead to modify the state? For this case, the update to the state would be synchronous.
If that is not possible, I would recommend to fall back to the Processor API. You read the state topic into a manually added state store. You also connect the state store to the processor that processes the comment topic -- this way, the processes can read and modify the state when processing a command directly -- writing anything back into the state input topic would be be required.