Search code examples
springapache-kafkaspring-webfluxspring-websocket

Spring WebFlux with Kafka and Websockets


Right now I have a simple Kafka Consumer and Producer implemented in My SpringBoot application , which works fine what I wanna do next is that my consumer takes the consumed message and directly broadcasts it to all subscribed clients. I figured out that i can not use STOMP Messaging with WebFlux, so how can I accomplish this task , I saw the reactive WebSocket implementation but I did not figure out how I could send my consumed data to my websocket.

That is my simple KafkaProducer:

fun addMessage(message: Message){
        val headers : MutableMap<String, Any> = HashMap()
        headers[KafkaHeaders.TOPIC] = topicName
        kafkaTemplate.send(GenericMessage<Message>(message, headers))
    }

And my simple Consumer looks like this:

@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
    fun receiveData(message:Message) :Message{
        //Take consumed data and send to websocket
    }

Solution

  • I would consider to have a Sinks.many().multicast().onBackpressureBuffer() as global intermediate container. Then in your receiveData() you just sink data into that Reactor abstraction.

    For your WebSocket connected sessions I would suggest to implement a org.springframework.web.reactive.socket.WebSocketHandler and use Sinks.Many.asFlux() in the WebSocketSession.send(Publisher<WebSocketMessage> messages) API. This way all your sessions are going to consume the same Kafka data as long as they are connected to this WebSocket server.

    See more info in docs: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler

    UPDATE

    You can find some sample here: https://github.com/artembilan/sandbox/tree/master/so-65667450