Search code examples
akka-stream

Manipulate Seq Elements in an Akka Flow


I have 2 flows like the following:

val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...

I want to combine these into a convenience method like the following:

val aToSeqOfC: Flow[A, Seq[C], NotUsed]

So far I have the following, but I know it just ends up with C elements and not Seq[C].

Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)

How can I preserve the Seq in this scenario?


Solution

  • Indirect Answer

    In my opinion your question highlights one of the "rookie mistakes" that is common when dealing with akka streams. It is usually not good organization to put business logic within akka stream constructs. Your question indicates that you have something of the form:

    val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B => 
      //business logic
    }
    

    The more ideal scenario would be if you had:

    //normal function, no akka involved
    val bToCFunc : B => C = { b : B =>
      //business logic
    }
    
    val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc
    

    In the above "ideal" example the Flow is just a thin veneer on top of normal, non-akka, business logic.

    The separate logic can then simply solve your original question with:

    val aToSeqOfC : Flow[A, Seq[C], NotUsed] = 
      aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))
    

    Direct Answer

    If you cannot reorganize your code then the only available option is to deal with Futures. You'll need to use bToC within a separate sub-stream:

    val mat : akka.stream.Materializer = ???
    
    val seqBToSeqC : Seq[B] => Future[Seq[C]] = 
      (seqB) =>
        Source
          .apply(seqB.toIterable)
          .via(bToC)
          .to(Sink.seq[C])
          .run()
    

    You can then use this function within a mapAsync to construct the Flow you are looking for:

    val parallelism = 10
    
    val aToSeqOfC: Flow[A, Seq[C], NotUsed] = 
      aToSeqB.mapAsync(parallelism)(seqBtoSeqC)