Search code examples
apache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring-cloud-stream functional model with apache-kafka-binder


This is sort of sequel to this question. Can I use "plain" Apache Kafka Binder together with functional model? So far using annotation based configuration I mixed both, spring-cloud-stream-binder-kafka for simple consuming / producing and spring-cloud-stream-binder-kafka-streams for advanced stream processing in one application.

Functional model seems to be supported only by streams binder, and if I try to mix both approaches - annotation based for simple usage and functional for streams, stream binding is not registered.

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

@EnableBinding(SimpleBinding.class) is present on configuration class. Is it preferred / supported to mix both as described or should I use streams-binder even for simple message consumption?


Solution

  • For Kafka Binder you can and absolutely should use functional model and forget about StreamListener all together. This way it's going to be aligned with your KStream functional model.

    spring.cloud:
            stream:
              function:
                definition: processStream
              bindings:
                processStream-in-0:
                  destination:  my-topic
                listen-in-0:
                  destination: another-topic
    
    @Component
    public class SimpleListener {
    
        @Bean
        public Consumer<SomeDto> listen() {
            return payload -> ...
        }
    }
    
    @Configuration
    public class FunctionalStream {
    
        @Bean
        public Consumer<KStream<String>> processStream() {
            return eventStream -> eventStream.map()
        }
    }