Search code examples
spring-bootapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

How to bind the same topic to multiple functions with the Kafka Streams binder in Spring Cloud Stream?


With Spring Cloud Stream and the Kafka Streams binder, I would like to process the output of a function in another one, as in:

@Bean
public Function<KStream<String, Double>, KStream<String, Double>> sqrt() {
    return numbers -> numbers.mapValues(Math::sqrt);
}

@Bean
public Consumer<KStream<String, Double>> log() {
    return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value));
}

where sqrt() outputs the square root of a number, which is then logged with log(). application.yaml therefore looks like this:

spring:
  cloud:
    stream:
      function:
        bindings:
          sqrt-in-0: numbers
          sqrt-out-0: sqrt-numbers
          log-in-0: sqrt-numbers
      kafka:
        streams:
          bindings:
            sqrt:
              consumer:
                application-id: sqrtApplicationId
            log:
              consumer:
                application-id: logApplicationId

When starting the application, I get the following error:

The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

Now of course setting definition-overriding to true is not a proper fix, and it will fail with an IllegalStateException.

How do I solve this?

A reproduction of the problem can be found here: https://github.com/cedric-schaller/dltawareprocessor-type-error


Solution

  • Assuming that you have two Kafka topics called, numbers and sqrt-numbers, the following configuration should work.

    spring:
      cloud:
        stream:
          bindings:
            sqrt-in-0:
              destination: numbers
            sqrt-out-0: 
              destination: sqrt-numbers
            log-in-0: 
              destination: sqrt-numbers
          kafka:
            streams:
              bindings:
                sqrt-in-0:
                  consumer:
                    application-id: sqrtApplicationId
                log-in-0:
                  consumer:
                    application-id: logApplicationId
    

    You can use spring.cloud.stream.function.bindings.. to override a default binding name. For example, if you want to change the binding name from sqrt-in-0 to input, you can do it like spring.cloud.stream.function.bindings.sqrt-in-0: input. You still need to set destination on the overridden binding though (via spring.cloud.stream.bindings.input.destination).

    The particular exception you are getting is because you are trying to reuse an already created binding name - sqrt-numbers.