With Spring Cloud Stream and the Kafka Streams binder, I would like to process the output of a function in another one, as in:
@Bean
public Function<KStream<String, Double>, KStream<String, Double>> sqrt() {
return numbers -> numbers.mapValues(Math::sqrt);
}
@Bean
public Consumer<KStream<String, Double>> log() {
return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value));
}
where sqrt()
outputs the square root of a number, which is then logged with log()
. application.yaml
therefore looks like this:
spring:
cloud:
stream:
function:
bindings:
sqrt-in-0: numbers
sqrt-out-0: sqrt-numbers
log-in-0: sqrt-numbers
kafka:
streams:
bindings:
sqrt:
consumer:
application-id: sqrtApplicationId
log:
consumer:
application-id: logApplicationId
When starting the application, I get the following error:
The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
Now of course setting definition-overriding
to true
is not a proper fix, and it will fail with an IllegalStateException
.
How do I solve this?
A reproduction of the problem can be found here: https://github.com/cedric-schaller/dltawareprocessor-type-error
Assuming that you have two Kafka topics called, numbers
and sqrt-numbers
, the following configuration should work.
spring:
cloud:
stream:
bindings:
sqrt-in-0:
destination: numbers
sqrt-out-0:
destination: sqrt-numbers
log-in-0:
destination: sqrt-numbers
kafka:
streams:
bindings:
sqrt-in-0:
consumer:
application-id: sqrtApplicationId
log-in-0:
consumer:
application-id: logApplicationId
You can use spring.cloud.stream.function.bindings..
to override a default binding name. For example, if you want to change the binding name from sqrt-in-0
to input
, you can do it like spring.cloud.stream.function.bindings.sqrt-in-0: input
. You still need to set destination
on the overridden binding though (via spring.cloud.stream.bindings.input.destination
).
The particular exception you are getting is because you are trying to reuse an already created binding name - sqrt-numbers
.