Search code examples
scalaakka-stream

Akka - convert Flow into Collection or Publisher


I'm trying to split an Akka Source into two separate ones.

  val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convert to Seq[BodyPartEntity]
  val dataFlow    = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convert to Publisher[BodyPartEntity]

  implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
    def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
      GraphDSL.create() { implicit builder =>
        import akka.stream.scaladsl.GraphDSL.Implicits._
        val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
        source ~> partition.in
        partition.out(0).map(_.getEntity) ~> left
        partition.out(1).map(_.getEntity) ~> right
        ClosedShape
      }
    }
  }

How to convert requestFlow into Seq[BodyPartEntity] and dataFlow into Publisher[BodyPartEntity]


Solution

  • You could use a BroadcastHub for this. From doc:

    A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.

    Simplified code:

    val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
      Source(1 to 5).toMat(
        BroadcastHub.sink(bufferSize = 4))(Keep.right)
    
    val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
    
    // Process the messages from the producer in two independent consumers
    fromProducer.runForeach(msg => println("consumer1: " + msg))
    fromProducer.runForeach(msg => println("consumer2: " + msg))