Search code examples
apache-kafkaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

KTable not deduplicating the incoming records with same keys


I am trying to deduplicate the records using input topic as KTable and sinking them to output topic. But the KTable is still sinking the duplicate records to the output topic. Not sure where am I going wrong.

Here is my application.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          process-in-0: input.topic
          process-out-0: output.topic
        definition: process
      kafka:
        streams:
          bindings:
            process-in-0:
              consumer:
                materializedAs: incoming-store
          binder:
            application-id: spring-cloud-uppercase-app
            brokers: localhost:9092
            configuration:
              commit:
                interval:
                  ms: 1000
                state.dir: state-store
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde

As per the spring cloud stream kafka stream documentation about state store, I have added the materialized view above as incoming-store

The process() bean function takes the input topic as KTable and and sink it to output topic


    @Bean
    public Function<KTable<String, String>, KStream<String, String>> process(){
        return table -> table
                .toStream()
                .peek((k, v) -> log.info("Received key={}, value={}", k, v));
    }

For a given input of 4 records

key=111, value="a"
key=111, value="a"
key=222, value="b"
key=111, value="a"

I am expecting to get only 2 records

key=111, value="a"
key=222, value="b"

But getting all the 4 records. Any help would be really appreciated!


Solution

  • You can group by a key and aggregate the events. Although you are not concatenating the strings during the aggregation process, the aggregate transformation will be used just to emit the values that you are grouping by the keys 111 or 222. Your use case is just a distinct aggregation. Every time that you aggregate you will receive (key, value, aggregate), then you keep only the value that it will be the latest value.

    @Slf4j
    @Configuration
    @EnableAutoConfiguration
    public class KafkaAggFunctionalService {
    
        @Bean
        public Function<KTable<String, String>, KStream<String, String>> aggregate() {
            return table -> table
                    .toStream()
                    .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
                    .aggregate(() -> "", (key, value, aggregate) ->
                                    value,
                            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
                    )
                    .toStream()
                    .peek((k, v) -> log.info("Received key={}, value={}", k, v));
        }
    }
    

    This git repo has a lot of examples. The one that looks very similar to yours is this.