I have successfully set up spring-cloud-function with kafka-streams in Kotlin and have a working prototype.
I have one function that receives the events from one topic and produces another event to another topic.
Here's my application.yml
spring:
cloud:
stream:
bindings:
consumeAndProduce-in-0:
destination: topicToConsumeFrom
consumeAndProduce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consumeAndProduce
My kotlin application looks like this:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
return { message ->
doSomethingAndReturnProducerEvent(message)
}
}
}
I followed numerous examples on the internet and it works like a charm. As soon as a message is put into the topicToConsumeFrom
, the function is called and the result will be written to the topicToProduceTo
.
Now my question: What would be the proper handling if my function is not always producing something. It is a very common use-case that a service listens on a topic and ignores messages. It should only react to specific messages and only then produce an output, otherwise do nothing. I tried that by creating a different producer function, which I call from the consumer only if applicable:
Here's my adapted application.yml
that now defines two different functions consume
and produce
:
spring:
cloud:
stream:
bindings:
consume-in-0:
destination: topicToConsumeFrom
produce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consume;produce
In my application, I renamed consumeAndProduce()
to consume()
and the return value is now Unit
. Also, I created a second bean produce()
that does nothing but returning a function that returns the payload that I give to it:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}
@Bean
fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
return { message ->
if (isSomethingIWantToReactTo(message)) {
val result = doSomethingAndReturnProduceEvent(message)
producer(result)
}
}
}
@Bean
fun produce(): (ProducerEvent) -> ProducerEvent {
return { it }
}
}
Now the consume()
-function is called if a message is present in the topic and if the message is for me, I'll do something and call the produce()
-function with my result, otherwise do nothing.
I was not able to see any message in the outgoing topic.
I know through debugging, that it went into the if-branch and calls produce()
with my result, but it seems the kafka-binding doesn't work and no message is ever really sent.
I'm aware, my approach is somewhat naïve, but after extensive research I couldn't find anything that describes this use-case. All examples always have one function that consumes and produces in all cases.
Is there a proper way to do it?
There are two possible solutions based on the first code example using only one function consumeAndProduce()
for consuming and producing:
null
, no event will be produced:@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent? {
return { message ->
if(isSomethingIWantToReactTo(message)) {
doSomethingAndReturnProducerEvent(message)
}
null
}
}
KStream
s to consume a stream of events and produce a stream of events. That way any incoming event that does not apply can simply be filtered:@Bean
fun consumeAndProduce(): (KStream<ConsumerEvent>) -> KStream<ProducerEvent> {
return { messages ->
messages
.filter(message -> isSomethingIWantToReactTo(message))
.map(message -> doSomethingAndReturnProducerEvent(message))
}
}
Note: I did not follow this approach and didn't try it out, thus research into KStreams is advised.