Search code examples
scalaakkaakka-streamakka-http

Akka: Websocket handler failed with Cannot subscribe to shut-down Publisher


I am using Akka Streams and Akka Http to implement a websocket stream. The stream uses a queue as a source to which TextMessage's are offered as follows:

  val (queue, source) = Source.queue[Message](0, OverflowStrategy.backpressure).recoverWithRetries(-1, {
    case exception: Exception =>
      println(exception)
      Source(Nil)
  }).preMaterialize()

  def send[T](message: T)(implicit jsonFormat: JsonFormat[T]): Unit = queue.offer(TextMessage.Strict(message.toJson.toString()))

The flow is build as follows

 val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, source).via(reportErrorsFlow)

  def reportErrorsFlow[T]: Flow[T, T, Any] =
    Flow[T]
      .watchTermination()((_, f) => f.onComplete {
        case Failure(cause) =>
          println(s"WS stream failed with $cause")
        case ex => println("Complete", ex) // ignore regular completion
      })

It is then provided to the routes

  val websocketRoute: Route =
    path(pathName) {
      handleWebSocketMessages(flow)
    }

The server is then spun up

  val routes = cors() {
    concat(health, websocketRoute, ...other routes)
  }
  val bindingFuture = Http().newServerAt("localhost", 8080).bind(routes)
  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate())

The issue I am running into is that the web socket does not gracefully close and when closing the web socket while messages are inflight results in crashing the flow and I receive the following error when trying to connect again

Websocket handler failed with Cannot subscribe to shut-down Publisher (akka.stream.impl.ActorPublisher$NormalShutdownException: Cannot subscribe to shut-down Publisher)

Solution

  • The reason why you get that error is because you try to reuse prematerialized Source in your websocket handler.

    When you prematerialize a Source you get the materialized value (the queue in your case) and a single use Source that's associated with that materialized value.

    What you need to do is that every time you get a websocket connection you should set all the stream from scratch. In your case would mean to prematerilize the Source.queue every time your route triggers. So don't pass flow val to the handleWebSocketMessages directive, call a function that returns a Flow there, say flow() that will set everything up.