Search code examples
kotlinspring-kafkaspring-cloud-streamspring-cloud-function

How to consume Kafka-events with spring-cloud-function, but only sometimes produce


I have successfully set up spring-cloud-function with kafka-streams in Kotlin and have a working prototype.

I have one function that receives the events from one topic and produces another event to another topic.

Here's my application.yml

spring:
  cloud:
    stream:
      bindings:
        consumeAndProduce-in-0:
          destination: topicToConsumeFrom
        consumeAndProduce-out-0:
          destination: topicToProduceTo
      kafka:
        binder:
          brokers: ${kafka.broker.prod}
      default:
        group: ${spring.application.name}
    function:
      definition: consumeAndProduce

My kotlin application looks like this:

@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            runApplication<Application>(*args)
        }
    }

    @Bean
    fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
        return { message ->
            doSomethingAndReturnProducerEvent(message)
        }
    }
}

I followed numerous examples on the internet and it works like a charm. As soon as a message is put into the topicToConsumeFrom, the function is called and the result will be written to the topicToProduceTo.


Now my question: What would be the proper handling if my function is not always producing something. It is a very common use-case that a service listens on a topic and ignores messages. It should only react to specific messages and only then produce an output, otherwise do nothing. I tried that by creating a different producer function, which I call from the consumer only if applicable:

Here's my adapted application.yml that now defines two different functions consumeand produce:

spring:
  cloud:
    stream:
      bindings:
        consume-in-0:
          destination: topicToConsumeFrom
        produce-out-0:
          destination: topicToProduceTo
      kafka:
        binder:
          brokers: ${kafka.broker.prod}
      default:
        group: ${spring.application.name}
    function:
      definition: consume;produce

In my application, I renamed consumeAndProduce()to consume() and the return value is now Unit. Also, I created a second bean produce() that does nothing but returning a function that returns the payload that I give to it:

@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            runApplication<Application>(*args)
        }
    }

    @Bean
    fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
        return { message ->
            if (isSomethingIWantToReactTo(message)) {
                val result = doSomethingAndReturnProduceEvent(message)
                producer(result)
            }
        }
    }

    @Bean
    fun produce(): (ProducerEvent) -> ProducerEvent {
        return { it }
    }
}

Now the consume()-function is called if a message is present in the topic and if the message is for me, I'll do something and call the produce()-function with my result, otherwise do nothing.

I was not able to see any message in the outgoing topic. I know through debugging, that it went into the if-branch and calls produce() with my result, but it seems the kafka-binding doesn't work and no message is ever really sent.

I'm aware, my approach is somewhat naïve, but after extensive research I couldn't find anything that describes this use-case. All examples always have one function that consumes and produces in all cases.

Is there a proper way to do it?


Solution

  • There are two possible solutions based on the first code example using only one function consumeAndProduce() for consuming and producing:

    1. Return a nullable event. If the function returns null, no event will be produced:
    @Bean
    fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent? {
        return { message ->
            if(isSomethingIWantToReactTo(message)) {
                doSomethingAndReturnProducerEvent(message)
            }
            null
        }
    }
    
    1. Use KStreams to consume a stream of events and produce a stream of events. That way any incoming event that does not apply can simply be filtered:
    @Bean
    fun consumeAndProduce(): (KStream<ConsumerEvent>) -> KStream<ProducerEvent> {
        return { messages ->
            messages
                .filter(message -> isSomethingIWantToReactTo(message))
                .map(message -> doSomethingAndReturnProducerEvent(message))
        }
    }
    

    Note: I did not follow this approach and didn't try it out, thus research into KStreams is advised.