I am using Akka Streams and Akka Http to implement a websocket stream. The stream uses a queue as a source to which TextMessage
's are offered as follows:
val (queue, source) = Source.queue[Message](0, OverflowStrategy.backpressure).recoverWithRetries(-1, {
case exception: Exception =>
println(exception)
Source(Nil)
}).preMaterialize()
def send[T](message: T)(implicit jsonFormat: JsonFormat[T]): Unit = queue.offer(TextMessage.Strict(message.toJson.toString()))
The flow is build as follows
val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, source).via(reportErrorsFlow)
def reportErrorsFlow[T]: Flow[T, T, Any] =
Flow[T]
.watchTermination()((_, f) => f.onComplete {
case Failure(cause) =>
println(s"WS stream failed with $cause")
case ex => println("Complete", ex) // ignore regular completion
})
It is then provided to the routes
val websocketRoute: Route =
path(pathName) {
handleWebSocketMessages(flow)
}
The server is then spun up
val routes = cors() {
concat(health, websocketRoute, ...other routes)
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(routes)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate())
The issue I am running into is that the web socket does not gracefully close and when closing the web socket while messages are inflight results in crashing the flow and I receive the following error when trying to connect again
Websocket handler failed with Cannot subscribe to shut-down Publisher (akka.stream.impl.ActorPublisher$NormalShutdownException: Cannot subscribe to shut-down Publisher)
The reason why you get that error is because you try to reuse prematerialized Source
in your websocket handler.
When you prematerialize a Source
you get the materialized value (the queue in your case) and a single use Source
that's associated with that materialized value.
What you need to do is that every time you get a websocket connection you should set all the stream from scratch. In your case would mean to prematerilize the Source.queue
every time your route triggers. So don't pass flow
val to the handleWebSocketMessages
directive, call a function that returns a Flow
there, say flow()
that will set everything up.