Search code examples
akkaakka-streamreactive-streams

How do I use Source.asSubscriber to wrap reactive listener?


How do I use Source.asSubscriber to wrap reactive listener ? I fail to understand its benefit.

I'm trying to create Source[T] for asynchttpclient WebSocket. Here is my code:

def createWsObservable(url: String, onStartAction: Option[WebSocket ⇒ Unit]): Source[WsMessage, KillSwitch] =
  Source.asSubscriber[WsMessage].mapMaterializedValue { subs: Subscriber[WsMessage] ⇒
    val listener: WebSocketListener = new WebSocketListener() {
      override def onOpen(ws: WebSocket): Unit =
        subs.onNext(WsOpen(ws))

      override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
        subs.onComplete()

      override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
        // Doing bunch of stuff here
        subs.onNext(...)

      override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
        // Doing bunch of stuff here
        subs.onNext(...)

      override def onError(t: Throwable): Unit =
        subs.onError(t)

      override def onPongFrame(payload: Array[Byte]): Unit = {
        super.onPingFrame(payload)
      }
    }

    val websocket =
      asyncHttpClient
        .prepareGet(url)
        .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get

    new KillSwitch {
      override def shutdown(): Unit = websocket.sendCloseFrame()
      override def abort(ex: Throwable): Unit = websocket.sendCloseFrame()
    }
  }

On first event I get exception:

java.lang.IllegalStateException: spec violation: onNext was signaled from upstream without demand
    at akka.stream.impl.VirtualProcessor.rec$5(StreamLayout.scala:239)
    at akka.stream.impl.VirtualProcessor.onNext(StreamLayout.scala:243)
    at ingestion.NettyClientWrapper$$anon$2.onOpen(NettyClientWrapper.scala:55)

Perhaps Source.asSubscriber is bad choice for me ? What should I do to wrap reactivestreams Subscriber into akka's Source ?


Solution

  • I ended up using Source.actorRef with KillSwitch

    def createWsObservable(url: String, onStartAction: WebSocket ⇒ Option[KillSwitch] = _ ⇒ None, bufferSize: Int = 32): Source[WsMessage, KillSwitch] = {
      val actorSource = Source.actorRef[WsMessage](bufferSize, OverflowStrategy.fail)
      actorSource.mapMaterializedValue { actor ⇒
        val listener: WebSocketListener = new WebSocketListener() {
    
          override def onOpen(ws: WebSocket): Unit =
            actor ! ...
    
          override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
            actor ! akka.actor.Status.Success(code)
    
          override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
            actor ! ...
    
          override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
            actor ! ...
    
          override def onError(t: Throwable): Unit =
            actor ! akka.actor.Status.Failure(t)
        }
    
        val websocket =
          asyncHttpClient
            .prepareGet(url)
            .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get
    
        // Use Pong reply as indication that we've connected to the server
        val p = Promise[Void]()
        websocket.sendPingFrame().addListener(p)
        val onStartKillSwitchFut = p.future.map(_ ⇒ onStartAction(websocket))
    
        new KillSwitch {
          override def shutdown(): Unit = {
            onStartKillSwitchFut.map(_.foreach(_.shutdown()))
            websocket.sendCloseFrame()
          }
    
          override def abort(ex: Throwable): Unit = {
            onStartKillSwitchFut.map(_.foreach(_.abort(ex)))
            websocket.sendCloseFrame()
          }
        }
      }
    }