class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
var events = Seq.empty[Int]
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive: Actor.Receive = {
case OnNext(e: Int) => events = e +: events
case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
}
}
val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))
FlowGraph { implicit b =>
import akka.stream.scaladsl.FlowGraphImplicits._
Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
println("Got a number " + e)
)
}.run()
According to Flow.apply(Sink, Source)
doc:
Create a Flow from a seemingly disconnected Source and Sink pair.
If that's true, why graph remains unconnected?
This is a known issue and will be fully fixed in M4.