Search code examples
javaapache-kafkaapache-kafka-streamsspring-cloud-stream

Unable to customize kafkaStreams globalstaterestorelistener


This is my configuration class but when I delete the state and restart so it builds from the changelog topic I am unable see any messages in the logs.

Steps to produce setup kafkastream:

  1. Write to a kafka topic and kstream reads and stores state locally.
  2. I stop application and kafkastream builds from the changetopic
  3. It is still displaying the default logs.
[-StreamThread-2] o.a.k.s.p.i.StoreChangelogReade stream-thread [foobar-91eae487-939e-439a-bd5f-c918c1f13145-StreamThread-2] Finished restoring changelog foobar-test-avro-leg-changelog-1 to store test-avro-leg with a total number of 66718 records
@EnableAutoConfiguration
@Slf4j
public class SpringKafkaStreamConfig {

    @Bean
    public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(){
        return factoryBean -> {

            List< StreamsBuilderFactoryBean.Listener
                > out = factoryBean.getListeners();
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
                        java.util.Date start = null;
                        java.util.Date stop = null;
                        @Override
                        public void onRestoreStart(TopicPartition topicPartition,String storeName,long startingOffset,long endingOffset) {
                            start = Time.from(Instant.now());
                            log.info("Restarting the building of the following " +
                                         "state store: {} " +
                                         "starting " +
                                         "at offset: {} at the this time: {}",
                                     storeName,
                                     startingOffset,Time.from(Instant.now()));
                        }

                        @Override
                        public void onBatchRestored(TopicPartition topicPartition,String storeName,long batchEndOffset,long numRestored) {

                        }

                        @Override
                        public void onRestoreEnd(TopicPartition topicPartition,
                                                 String storeName,long totalRestored) {
                            stop = Time.from(Instant.now());
                            log.info("State has completed building at this " +
                                         "time: {} and restored for the " +
                                         "following records: {}",
                                     stop,totalRestored);
                        }
                    });
                }
            });
        };
    }
}

Solution

  • The StreamsBuilderFactoryBeanCustomizer is a Boot class, used to customize the single SBFB configured by Spring Boot.

    That factory bean is not used by the binder because a new one is needed for each binding.

    Spring Cloud Stream uses the StreamsBuilderFactoryBeanConfigurer provided by spring-kafka instead (if there is exactly one in the application context).