Search code examples
scalaakkaakka-stream

Source.combine doesn't take varargs?


Is Akka trying to communicate that Source.combine shouldn't be used with a collection of sources? Or am I being dumb somehow with the function definition?

Akka Source.combine requires a first and second source before vararags. The function definition given below:

def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]

I'd like to just do something like:

val sources : Seq[Source[Int,_]] = ???
Source.combine(sources:_*)(Merge(_))

I don't know if my sources will have 1,2, or many sources. So writing up the cases adds a few lines. Not a huge deal, but I feel like I'm missing something. Is this an anti-pattern for akka streams?


Solution

  • The point of the pattern first: Source[T, _], second: Source[T, _], rest: Source[T, _]* is to assure that you pass at least 2 (might be more) sources to method.

    If method signature allowed sources:_* you could be passing empty vararg or with just single element.

    In your case, if sources is Seq, I would just do pattern matching on sources to split into first and second element and the rest:

    sources match {
       case first :: second :: rest => Source.combine(first, second, rest:_*)(Merge(_))
       case _ => ??? // too few elements, maybe return Source.failed?
    }