Search code examples
scalaakkaakka-stream

Preserve relation with upstream elements in akka stream


I have following code sample that works fine. I want to add some changes to preserve relation between request and responce. How can I ahieve that? Rest api flow's materialized value is NotUsed. Is it possible to somehow use Keep.both for that?

// this flow is provided by some third party library that I can't change in place
val someRestApiFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsync(10)(x => Future(x + 1))

val digits: Source[Int, NotUsed] = Source(List(1, 2, 3))

val r = digits.via(someRestApiFlow).runForeach(println)

Result is

2
3
4

I want result to be like

1 -> 2
2 -> 3
3 -> 4

Solution

  • You can use a broadcast element to create 2 separate flows. The first output of broadcast goes through someRestApiFlow, the second output of the broadcast goes unmodified. You then zip the output of the someRestApiFlow with the second output of the broadcast flow. Doing that, you have both the input element and the result of its transformation through someRestApiFlow.

    digits ---> broadcast --> someRestApiFlow ---> zip --> result
                         \----------------------/