Search code examples
apache-kafkaapache-kafka-streams

How can a key/value stored in app instance A's state store be deleted using app instance B


It is my understanding that state stores are local per app instance (re: partition).... from the docs.

Because Kafka Streams partitions the data for processing it, an application’s entire state is spread across the local state stores of the application’s running instances.

I have a use case where I need to only have any arbitrary key that contains a specific value (let's call it value123). If another keyB/value123 message is received and that value123 is the same but previously has a different key (keyD), I need to delete the old keyD/value123.

Here is the problem - I only receive new key/value associations. I don't receive "tombstone" messages for old keys - therefore I have to imply the tombstone because a new key just arrived on the topic having the same value. There's no way to access (delete) the key/value if it's on another app instance's state store because states are local per instance. I need to evict old data. How can I achieve this?

To look at it another way:

If a message with key A comes into a transformer and that transformer's job is the clean up the state to make sure no other keys have that value... let's say Key A's value is currently 'associated' to key B. I need to delete key B from the KTable/state store so that Key A can be the only thing associated to the value now. I can't guarantee that Key B is assigned to the same partition as key A. How do I delete key B from the partition that key A.


Solution

  • I was able to solve my problem by switching my key to the other data point and using the new 2.5.0 feature to join 2 ktables by foreign key. This would control the output because once the new record came in with the same key (but a different foreign key) my other ktable wouldn't join because the foreign key had changed.

    I used these two as resources:

    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableForeign-KeyJoin

    https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html