I have a Kafka stream processing application written using Spring Boot, using spring-cloud-function
and spring-cloud-stream-binder-kafka-streams
. The method which processes a couple of streams is annotated with @Bean
, so that it should be picked up by spring-cloud-function
(rather than using @StreamListener
). When this method returns a BiFunction
it works. But when I try it as a plain Kotlin lambda it's not picked up by Spring Boot: the app starts then immediately ends as it finds no functions to run.
From what I can see in the documentation, this should work.
Here's the declaration that does work:
@Bean
fun process():
BiFunction<KStream<String, Foo>, GlobalKTable<String, Bar>, KStream<String, Baz>> =
BiFunction { foo, bar ->
...
And here's the declaration that does not work:
@Bean
fun process():
(foo: KStream<String, Foo>, bar: GlobalKTable<String, Bar>) -> KStream<String, Baz> =
{ foo, bar ->
...
(The content of the method is the same in both cases.)
As per the documentation, I have added the spring-cloud-function-kotlin
module to the classpath by adding this to build.gradle.kts
:
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
The version of Spring Cloud Stream is Hoxton.RC1
.
Is there anything else I need to do to get the function picked up? Or do I need to use BiFunction
in this case?
Currently (in 3.0), the Kafka Streams binding functional support requires java.util.function
types. It does not work with standard functions in Kotlin yet (or we haven't validated anything in that regard). We are planning to look into this as a 3.1 feature. If you don't mind, could you create a new issue here?