I'm using akka with a microservice framework, so I've got a lot of completionStage requests. I want to get a list of elements from one microservice and zip them together with a single element from another such that I end up with a Source of Pair<list item, single element>.
I can't do this with a normal zip because Source.zip completes as soon as one of the two sources completes so I only end up with one element propagated.
I can't use Source.zipAll because that requires me to define the default element ahead of time.
If I already had the single element ahead of time I could use Source.repeat to make it repeatedly propagate that one element, meaning that the Source.zip would instead complete when the list of elements completed, but Source.repeat can't take a completion stage or a Source.completionStage.
My current tactic is to zip the things together before I mapConcat the list elements.
Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());
return Source.completionStage(anotherService.getListOfElements().invoke)
.zip(singleElement)
.flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));
This eventually gets to what I want, but I feel like there's a lot of unnecessary duplication and moving around of data synchronously. Is there a better way to solve this problem that I'm missing?
The flatMapConcat
operator should allow you to construct a Source.repeat
which repeats the single element once it's known. In Scala (Source.future
being the Scala equivalent of Source.completionStage
: I'm not familiar enough with Java lambda syntax to answer in Java):
val singleElement = Source.future(oneService.getSingleElement)
Source.future(anotherService.getListOfElements)
.mapConcat(lst => lst) // unspool the list
.zip(singleElement.flatMapConcat(element => Source.repeat(element)))