I have an application that is based on spring boot, spring-kafka and kafka-streams. When application starts up, it creates kafka streams topology with default list of topics. What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology. Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application. Can someone suggest me how to do this in runtime?
I found 1 solution. I extend StreamsBuilderFactoryBean:
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}
And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():
@Component
public class DynamicStream {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
//build topology
}
//call this method when stream reconfiguration is needed
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}
}