Search code examples
scalaakkaakka-stream

Unable to run a SourceShape using GraphStage class in Akka Stream


I am trying to create a Redis Akka Stream Source using the GraphStage construct. The idea is that whenever I get an update from the subscribe method, I push that to the next component. Also if there is no pull signal, the component should backpressure. Here is the code:

class SubscriberSourceShape(channel: String, subscriber: Subscriber) 
    extends GraphStage[SourceShape[String]] {

  private val outlet: Outlet[String] = Outlet("SubscriberSource.Out")

  override def shape: SourceShape[String] = SourceShape(outlet)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      val callback = getAsyncCallback((msg: String) => push(outlet, msg)
      val handler = (msg: String) => callback.invoke(msg)

      override def preStart(): Unit = subscriber.subscribe(channel)(handler)
    }
  }
}

However, when I run this with a simple sink I get this error:

Error in stage [akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@58344854]: No handler defined in stage [...SubscriberSourceShape@6a9193dd] for out port [RedisSubscriberSource.Out(1414886352). All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.

What is wrong?


Solution

  • You are getting this error because you haven't set any output handler for your Source, so when the downstream components (Flow or Sink) send a pull signal to this Source, there is no handler to handle the pull signal.

    You can add an OutputHandler to get rid of the error. Leave the onPull method empty since you are producing your elements on the asyncCallback. Just add this inside the GraphStageLogic body:

          setHandler(outlet, new OutHandler {
            override def onPull(): Unit = {}
          })