Search code examples
akka-streamakka-grpc

Turn `func(in: Source[A]) : Source[B]` into a `Flow[A, B]`


I am using akka-grpc to generate client bindings. They usually have the form of

func[A, B](in: Source[A]) : Source[B],

i.e. they consume a Source[A] and offer a Source[B].

Now, I want to turn func into a Flow[A, B] to use them with akka-stream.


Solution

  • The solution is:

      def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] = 
        Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }
    

    It uses prefixAndTail to highjack the underyling Source.