Search code examples
spring-bootapache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Equivalent to ChannelInterceptor for Spring Cloud Stream Kafka Streams Binder


We develop an internal company framework on top of Spring Boot and we'd like to support Kafka-Streams with Spring Cloud Stream. We need to automagically inject some headers to all outbound messages. We've achieved this with standard Spring Cloud Stream Kafka Binder registering a custom ChannelInterceptor, but this is not working for Kafka Streams, as they seem to follow a different path.

Is there any equivalent to ChannelInterceptor for Spring Cloud Stream Kafka Streams binder?

I found this customizer/configurer:

 @Bean
  public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {

      factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
        @Override
        public void configureBuilder(final StreamsBuilder builder) {
          
        }

        @Override
        public void configureTopology(final Topology topology) {
          
        }
      });

      factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
        @Override
        public void customize(final KafkaStreams kafkaStreams) {
          
        }
      });

    };
  }

My last idea was to use the configureTopology method to automagically modify the Topology and insert a Transformer just before the last sink node, but in order to add this new node I have to specify the parent node, so I would need to know the name of last sink node and maybe all node names were generated automatically by Kafka Streams... The only way is to use topology.describe() method and maybe parse the String output... This sounds too complicated compared to a simple ChannelInterceptor.

Any ideas?


Solution

  • You could add a ProducerInterceptor to the streams config.

    /**
     * A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
     * they are published to the Kafka cluster.
     * <p>
     * This class will get producer config properties via <code>configure()</code> method, including clientId assigned
     * by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
     * sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
     * <p>
     * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if
     * the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception,
     * just log the errors.
     * <p>
     * ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
     * <p>
     * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
     */
    public interface ProducerInterceptor<K, V> extends Configurable {
    

    You can modify the record's headers there.

    (You can use this technique for the message channel binder too, instead of a ChannelInterceptor).