Search code examples
javaspring-bootapache-kafka-streams

Spring kafka re-create Kafka Stream Topology in runtime


I have an application that is based on spring boot, spring-kafka and kafka-streams. When application starts up, it creates kafka streams topology with default list of topics. What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology. Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application. Can someone suggest me how to do this in runtime?


Solution

  • I found 1 solution. I extend StreamsBuilderFactoryBean:

    @Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
    @Primary
    public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
        return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
    }
    
    public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
    
        private StreamsBuilder instance;
    
        public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
            super(streamsConfig);
        }
    
        @Override
        public boolean isSingleton() {
            return false;
        }
    
        @Override
        protected synchronized StreamsBuilder createInstance() {
            if (instance == null) {
                instance = new StreamsBuilder();
            }
            return instance;
        }
    
        @Override
        public synchronized void stop() {
            instance = null;
            super.stop();
        }
    }
    

    And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():

    @Component
    

    public class DynamicStream {

    private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
    
    public void init() {
        StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
            //build topology
    }
    
    //call this method when stream reconfiguration is needed
    public void reinitialize() {
        streamsBuilderFactoryBean.stop();
        init();
        streamsBuilderFactoryBean.start();
    }
    

    }