Search code examples
functional-programmingspring-kafkaspring-cloud-stream

spring cloud stream kafka binder can not publish and consume message by functional programming


I try to use functional programming way to produce and consume message

the spring-cloud-stream version is 3.2.7

this is my application,yml

spring:
  cloud:
    function:
      definition: numberProducer,numberConsumer
    stream:
      bindings:
        numberProducer-out-0:
          destination: first-topic
        numberConsumer-in-0:
          group: group
          destination: first-topic
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          input:
            consumer:
              enableDlq: false
              dlqName: dlq
              dlq-partitions: 1

this is producer and consumer bean

@Configuration
@Slf4j
public class KafkaConsumer {

  @Bean
  public Supplier<Integer> numberProducer() {
    return () -> new SecureRandom().nextInt(1, 100);
  }

  @Bean
  public Consumer<Integer> numberConsumer() {
    return incomingNumber -> log.info("Incoming Number : {}", incomingNumber);
  }
}

then I start application, but can not see message produced to topic is there anything wrong in my code?

after I debug, can see the bean has been created enter image description here

by the way, I also try to use streambridge to send message, but if I don't add @EnableBinding annotation, I got the error that can not find the bean of StreamBridge


Solution

  • I find the root cause is that we need use the latest spring boot version

    spring boot version to 3.0.1 spring cloud version to 2022.0.1