Search code examples
scalaakkaactorakka-stream

'Graph must be connected' with Flow created from Publisher+Subscriber actor


class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
  var events = Seq.empty[Int]

  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
  override def receive: Actor.Receive = {
    case OnNext(e: Int) => events = e +: events
    case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
  }
}

val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))

FlowGraph { implicit b =>
  import akka.stream.scaladsl.FlowGraphImplicits._

  Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
    println("Got a number " + e)
  )
}.run()

According to Flow.apply(Sink, Source) doc:

Create a Flow from a seemingly disconnected Source and Sink pair.

If that's true, why graph remains unconnected?


Solution

  • Endre@akka-user:

    This is a known issue and will be fully fixed in M4.