Search code examples
apache-kafka-streamsstream-processing

Which guarantees does Kafka Stream provide when using a RocksDb state store with changelog?


I'm building a Kafka Streams application that generates change events by comparing every new calculated object with the last known object.

So for every message on the input topic, I update an object in a state store and every once in a while (using punctuate), I apply a calculation on this object and compare the result with the previous calculation result (coming from another state store).

To make sure this operation is consistent, I do the following after the punctuate triggers:

  1. write a tuple to the state store
  2. compare the two values, create change events and context.forward them. So the events go to the results topic.
  3. swap the tuple by the new_value and write it to the state store

I use this tuple for scenario's where the application crashes or rebalances, so I can always send out the correct set of events before continuing.

Now, I noticed the resulting events are not always consistent, especially if the application frequently rebalances. It looks like in rare cases the Kafka Streams application emits events to the results topic, but the changelog topic is not up to date yet. In other words, I produced something to the results topic, but my changelog topic is not at the same state yet.

So, when I do a stateStore.put() and the method call returns successfully, are there any guarantees when it will be on the changelog topic?

Can I enforce a changelog flush? When I do context.commit(), when will that flush+commit happen?

process flow


Solution

  • To get complete consistency, you will need to enable processing.guarantee="exaclty_once" -- otherwise, with a potential error, you might get inconsistent results.

    If you want to stay with "at_least_once", you might want to use a single store, and update the store after processing is done (ie, after calling forward()). This minimized the time window to get inconsistencies.

    And yes, if you call context.commit(), before input topic offsets are committed, all stores will be flushed to disk, and all pending producer writes will also be flushed.