Search code examples
scalaakkaakka-stream

How do you process a sequence of Akka Stream Sources?


We have a Sink that can handle events:

def parseEvent(): Sink[T, Future[akka.Done]] = {
  Sink.foreach[T] { event => {
    // Do stuff with the event
  }}
}

This works fine with a single Source:

val mySource: Source[T] = ...  
mySource.takeWhile( someCheck, true ).runWith(parseEvent)

How do you get it working if you instead have:

val mySources: Seq[Source[T]] = ...

All sources should run in parallel and all events should reach parseEvent.


Solution

  • Something along the following lines should fit the bill:

    import akka.NotUsed
    import akka.stream.scaladsl.{ Concat, Merge, Source }
    
    def sourceFromSources[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] =
      sources.size match {
        case s if s < 1 => Source.empty[T]
        case 1 => sources.head
        case 2 => sources.head.merge(sources(1))
        case _ => Source.combine(sources.head, sources(1), sources.drop(2): _*)(Merge(_))
      }
    

    The merge strategy "merges multiple streams, taking elements as they arrive from the input streams" and chooses randomly if multiple streams have elements available. Backpressure is propagated from the downstreams to the upstreams.