Search code examples
apache-kafkaapache-kafka-streamsspring-kafka

How to compare two kafka tables?


Kafka Stream Config

@Configuration
@EnableKafkaStreams
@EnableKafka
public class KafkaConfig {


  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  KafkaStreamsConfiguration kstreamConfig(){
    Map<String,Object> props=new HashMap<>();
    props.put(APPLICATION_ID_CONFIG,"MY_GROUP_ID");
    props.put(BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
    props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    return new KafkaStreamsConfiguration(props);
  }
}

//My logic

    KTable<String,String> table1=streamsBuilder.table("TOPIC1");
    KTable<String,String> table2=streamsBuilder.table("TOPIC2");

Now I've two tables, I need to compare these two tables and the data which are not present in the table1 will be send to another kafka topic.


Solution

  • Call StreamsBuilderFactoryBean#getKafkaStreams() to get a KafkaStreams instance.

    Then call streams.store(StoreQueryParameters.fromNameAndType("<store-name>", QueryableStoreTypes.keyValueStore())) to get both stores, which you can iterate and compare using all()

    For any differences you want to send to another topic, use KafkaTemplate producer