Search code examples
apache-kafkaapache-kafka-streams

Kafka Streams Processor API clear state store


I am using kafka Processor API to do some custom calculations. Because of some complex processing, DSL was not the best fit. The stream code looks like the one below.

KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("storeName");
StoreBuilder<KeyValueStore<String, StoreObject>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier,
            Serdes.String(), storeObjectSerde);   
topology.addSource("SourceReadername", stringDeserializer, sourceSerde.deserializer(), "sourceTopic")
.addProcessor("processor", () -> new CustomProcessor("store"), FillReadername)
.addStateStore(storeBuilder, "processor") // define store for processor
.addSink("sinkName", "outputTopic", stringSerializer, resultSerde.serializer(),
                    Fill_PROCESSOR);

I need to clear some items from the state store based on an event coming in a separate topic. I am not able to find the right way to probably join with another stream using Processor API or some other way to listen to events in another topic to be able to trigger the cleanup code in the CustomProcessor class. Is there a way we can get events in another topic in Processor API? Or probably mix DSL with Processor API to be able to join the two and send events in any of the topic to the Process method so that I can run the cleanup code when an event is received in the cleanup topic?

Thanks


Solution

  • You just need to add another input topic (add:Source) and add Processor that transforms messages from that topic and based on them remove staff from state store. One note, those topics should use same keys (because of partitioning).