I'm trying to create a Kotlin service which is able to pass data from Kafka on to (specific) WebSocket connections. For instance if data from a user passes by on Kafka, the program receives it and I want to pass it on to the right WebSocket connection if this same user is currently connected.
The basics I have are:
@KafkaListener()
@Controller
@Secured(SecurityRule.IS_ANONYMOUS)
@ServerWebSocket("/ws/{id}")
class WebSocket() {
@OnOpen
fun onOpen(session: WebSocketSession, id: String): Publisher<String> {
return session.send("connection opened")
}
}
However if in the same class I receive a Kafka message, which holds the user id I'm looking for, how can I pass this to the right WebSocket connection?
I figured maybe if I use the "/ws/{id}"
from the url I could easily send it to the right id, but I can't figure out how.
The only way I know to send data to a connection is from inside the websocket annotated functions like OnOpen and OnMessage etc. Furthermore, I've found the Micronaut WebSocketBroadcaster to be useful, but only for broadcasting of course.
And something else I'm trying to figure out is where to ensure that the id that's being connected to is also the user's actual id and not someone else's, should I implement that in the @Secured annotation?
For anyone interested, i did find a solution to this.
Using the Micronaut WebSocketBroadcaster you can broadcast messages, and by using the second parameter, which takes a Predicate<WebSocketSession>
which defines to which WebSocketSessions it sends the message.
If you do know the specific WebSocketSession ID, you can simply use { it.id == "id" }
for a predicate, as is done in example 1.
If you do need to send it to a specific user, based on some other attribute, you can assign attributes to the WebSocketSession itself by using session.attributes.put("id", id)
for example in the onOpen function. The predicate you can use for this is in example 2, only thing about this is that I couldn't find a way to retreive the attributes without casting it to a map, which isn't very pretty.
class ExampleClass(val broadcaster: WebSocketBroadcaster) {
val userId = "user_001"
val webSocketId = "asdasdasd"
// [1] Send message to a specific socket ID
broadcaster.broadcast("message", { it.id == webSocketId)
// [2] Send message to a user with specific ID or any other identifying attribute of choice
broadcaster.broadcast("message", { it.attributes.asMap()["id"]?.equals(id) ?: false})
}