Search code examples
scalaakka-stream

Akka streams flattening Flows via


Flows can be connected with via like:

def aToB: Flow[A, B, NotUsed] = { ??? }  
def bToC: Flow[B, C, NotUsed] = { ??? }  
def aToC: Flow[A, C, NotUsed] = { aToB.via(bToC) }  

I would like to do the equivalent of flatMap:

def aToSomeB: Flow[A, Some[B], NotUsed] = { ??? }  
def aToSomeC: Flow[A, Some[C], NotUsed] = { aToSomeB.flatVia(bToC) }

Is there some built-in way to do flatVia? It seems like a common need for things like Option unwrapping and error flattening.


Solution

  • It really depends if you are interested in keeping those Nones around, or if you want to throw them away.

    As you typed your flow as Flow[A, Some[C], NotUsed] is seems you are not interested in Nones at all. This means you can easily filter them out with collect, e.g.

    def aToSomeC: Flow[A, C, NotUsed] = { aToSomeB.collect{case Some(x) ⇒ x}.via(bToC) }
    

    If, otherwise, you need to track the Nones (or the Lefts if you're dealing with Eithers), you'll need to write your "lifting" stage yourself. This can be written fairly generically. For example, it can be written as a function that takes any flow Flow[I, O, M] and returns another flow Flow[Either[E, I], Either[E, O], M]. Because it requires fan-out and fan-in stages, usage of GraphDSL is required.

      def liftEither[I, O, E, M](f: Flow[I, O, M]): Graph[FlowShape[Either[E, I], Either[E, O]], M] =
        Flow.fromGraph(GraphDSL.create(f) { implicit builder: GraphDSL.Builder[M] => f =>
    
          val fIn      = builder.add(Flow[Either[E, I]])
          val p        = builder.add(Partition[Either[E, I]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
          val merge    = builder.add(Merge[Either[E, O]](2))
          val toRight  = builder.add(Flow[O].map(Right(_)))
    
                     p.out(0).collect{case Left(x) ⇒ Left(x)}             ~> merge
          fIn.out ~> p.in
                     p.out(1).collect{case(Right(x)) ⇒ x} ~> f ~> toRight ~> merge
    
          new FlowShape(fIn.in, merge.out)
        })
    

    This can be used as per below

      def aToSomeB: Flow[A, Either[Throwable, B], NotUsed] = ???
      def aToSomeC: Flow[A, Either[Throwable, C], NotUsed] = aToSomeB.via(liftEither(bToC))
    

    Note that Options can easily be converted to Eithers to leverage the same helper function.