Search code examples
javaakkaakka-streamlagomcompletion-stage

How to combine Source.repeat and Source.completionStage using Akka


I'm using akka with a microservice framework, so I've got a lot of completionStage requests. I want to get a list of elements from one microservice and zip them together with a single element from another such that I end up with a Source of Pair<list item, single element>.

I can't do this with a normal zip because Source.zip completes as soon as one of the two sources completes so I only end up with one element propagated.

I can't use Source.zipAll because that requires me to define the default element ahead of time.

If I already had the single element ahead of time I could use Source.repeat to make it repeatedly propagate that one element, meaning that the Source.zip would instead complete when the list of elements completed, but Source.repeat can't take a completion stage or a Source.completionStage.

My current tactic is to zip the things together before I mapConcat the list elements.

Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());

return Source.completionStage(anotherService.getListOfElements().invoke)
    .zip(singleElement)
    .flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));

This eventually gets to what I want, but I feel like there's a lot of unnecessary duplication and moving around of data synchronously. Is there a better way to solve this problem that I'm missing?


Solution

  • The flatMapConcat operator should allow you to construct a Source.repeat which repeats the single element once it's known. In Scala (Source.future being the Scala equivalent of Source.completionStage: I'm not familiar enough with Java lambda syntax to answer in Java):

    val singleElement = Source.future(oneService.getSingleElement)
    
    Source.future(anotherService.getListOfElements)
      .mapConcat(lst => lst)  // unspool the list
      .zip(singleElement.flatMapConcat(element => Source.repeat(element)))