I started to learn Akka and came across a challenge for which I can't find an easy solution, despite having waded through the documentation and related Stakoverflow questions:
Building on the Client-Side Websocket Support example on the Akka website, I am using as the basis the following code snippet in Scala:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, Source.maybe)(Keep.left)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
The use case I have is a client (printSink) consuming a continuous stream from the websocket server. The communication uni-directional only, thus no need for a source.
My question is then as follows:
For question 1 (forcing a disconnect from the client), this should work
val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message])] =
Flow.fromSinkAndSourceMat(
printSink,
Source.maybe
)(Keep.both)
val (upgradeResponse, (closed, disconnect)) =
Http().singleWebsocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
disconnect
can then be completed with a None
to disconnect:
disconnect.success(None)
For question 2, my intuition is that that sort of dynamic stream operation would seem to require a custom stream operator (i.e. one level below the Graph DSL and two levels below the "normal" scaladsl
/javadsl
). I don't have a huge amount of direct experience there, to be honest.