Search code examples
javaakkaakka-stream

Keep ordering by merging multiple slow sources in akka streams


I need to merge multiple slow sources but keep the order. If the code is executed twice, the order must be the same.

The very simple solution for that is the following:

 Source
    .from(partions())
    .flatMapConcat(partition -> slowSource(partition))

That works, but the slow source for the second partition is executed after the first one. I want to run the slow sources in parallel but merge the results with a stable order.

I tried it like that:

Source
    .completionStage(Source
        .from(partions())
        .map(partition -> slowSource(partition).runWith(Sink.seq(), actorSystem))
        .runWith(Sink.seq(), actorSystem))
    .mapConcat(i -> i)
    .mapAsync(partitions, stage -> stage)
    .mapConcat(i -> i) 

It's working, but I need to create a list for each partition. Is there a better way to implement that?


Solution

  • Something along the lines of this should be close to what you seem to be asking for:

    Source.from(partitions())
      .map(partition -> (slowSource(partition).buffer(1, OverflowStrategy.backpressure())).preMaterialize(actorSystem))
      .flatMapConcat(Pair::second)
    

    By attaching a source to a buffer and materializing it, you effectively "prime the pump" by initializing it and signaling demand to the source. You can tweak the buffer sizes (e.g. can use statefulMap to have larger buffers on later partitions) if so inclined. If the main reason the sources are slow is that there's a long delay before the first element appears but after that they come fairly quickly, this will allow each partition to get set up quickly.