I have 2 flows like the following:
val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...
I want to combine these into a convenience method like the following:
val aToSeqOfC: Flow[A, Seq[C], NotUsed]
So far I have the following, but I know it just ends up with C
elements and not Seq[C]
.
Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)
How can I preserve the Seq
in this scenario?
Indirect Answer
In my opinion your question highlights one of the "rookie mistakes" that is common when dealing with akka streams. It is usually not good organization to put business logic within akka stream constructs. Your question indicates that you have something of the form:
val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B =>
//business logic
}
The more ideal scenario would be if you had:
//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
//business logic
}
val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc
In the above "ideal" example the Flow
is just a thin veneer on top of normal, non-akka, business logic.
The separate logic can then simply solve your original question with:
val aToSeqOfC : Flow[A, Seq[C], NotUsed] =
aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))
Direct Answer
If you cannot reorganize your code then the only available option is to deal with Futures. You'll need to use bToC
within a separate sub-stream:
val mat : akka.stream.Materializer = ???
val seqBToSeqC : Seq[B] => Future[Seq[C]] =
(seqB) =>
Source
.apply(seqB.toIterable)
.via(bToC)
.to(Sink.seq[C])
.run()
You can then use this function within a mapAsync
to construct the Flow you are looking for:
val parallelism = 10
val aToSeqOfC: Flow[A, Seq[C], NotUsed] =
aToSeqB.mapAsync(parallelism)(seqBtoSeqC)