Search code examples
apache-kafkaapache-kafka-streamsktable

Kafka Ktable also streaming duplicate updates


Kafka Ktable also streaming duplicate updates.

I want to process the Ktable(created with Kstream.reduce()) changelog stream, i.e any change in value of the keys in the Ktable. But its seems even when the same key value pair is sent multiple times to Ktable, it is sent downstream every time. I need to send update in the value for a key only if the value changes.

`

groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`


Solution

  • It's default behavior of KTable#toStream(), it convert the changelog topic to a KStream, so the downstream operator of reduce get updated each time the upstream reduce operator receive a message.

    You can archive your desire behavior using Processor API, in this case we use a KStream.transfomerValues().

    First register a KeyValueStore to store your latest value:

    //you don't need to add number_store, if your KTable already materialized to number_store
    streamsBuilder
            .addStateStore(Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));
    
    numberKStream
            .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
            .filter((key, value) -> value != null)
            .foreach((key, value) -> sendUpdate(key));
    

    Then we create an ExtractIfValueChangedTransformer, only return value of new message if the value has changed, if not then return null:

    public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {
    
        KeyValueStore<Long, Long> kvStore;
    
        @Override
        public void init(ProcessorContext context) {
            kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
        }
    
        @Override
        public Long transform(Long key, Long newValue) {
            Long oldValue = kvStore.get(key);
            kvStore.put(key, newValue);
            if (oldValue == null) return newValue;
            return oldValue.equals(newValue) ? null : newValue;
        }
    
        @Override
        public void close() {}
    }