I am using akka-grpc to generate client bindings. They usually have the form of
func[A, B](in: Source[A]) : Source[B]
,
i.e. they consume a Source[A]
and offer a Source[B]
.
Now, I want to turn func
into a Flow[A, B]
to use them with akka-stream.
The solution is:
def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] =
Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }
It uses prefixAndTail
to highjack the underyling Source
.