Kafka Stream Config
@Configuration
@EnableKafkaStreams
@EnableKafka
public class KafkaConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kstreamConfig(){
Map<String,Object> props=new HashMap<>();
props.put(APPLICATION_ID_CONFIG,"MY_GROUP_ID");
props.put(BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
}
//My logic
KTable<String,String> table1=streamsBuilder.table("TOPIC1");
KTable<String,String> table2=streamsBuilder.table("TOPIC2");
Now I've two tables, I need to compare these two tables and the data which are not present in the table1 will be send to another kafka topic.
Call StreamsBuilderFactoryBean#getKafkaStreams()
to get a KafkaStreams
instance.
Then call streams.store(StoreQueryParameters.fromNameAndType("<store-name>", QueryableStoreTypes.keyValueStore()))
to get both stores, which you can iterate and compare using all()
For any differences you want to send to another topic, use KafkaTemplate
producer