Search code examples
kotlinspring-kafkakotlin-coroutines

Using Kotlin Coroutines Together with Spring Kafka Listeners


I am trying to mix Spring Kafka (2.5.6.RELEASE) Listeners and Kotlin coroutines. In detail, I have a suspend fun:

suspend fun updatePrice(command: StockPriceUpdateCommand): Boolean

Then, I have a Spring Kafka Listener that must call the function every time it reads a new message from the partition:

@KafkaListener(
    id = "priceListener",
    topics = [ "prices" ],
    groupId = "prices",
    properties = [
        "key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer"
    ]
 ) 
 fun listenToPrices(
    @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY) stock: String,
    @Payload price: Double) {

    useCase.updatePrice(StockPriceUpdateUseCase.StockPriceUpdateCommand(stock, price))
 }

Clearly, the compiler doesn't let me call the updatePrice, beacuse of the error "Suspend function 'updatePrice' should be called only from a coroutine or another suspend function".

Which is the right approach in this case?

Thanks.


Solution

  • See a similar question here about @RabbitListener.

    It's not clear what you are trying to achieve here.

    My understanding is suspend functions can only be called from a coroutine; since @RabbitListener methods are called by the framework, not user code, we'd have to add a shim between the framework and the listener - but exactly how would that perform any useful function?