Search code examples
scalaakka-stream

How to use flatMapConcat?


I am trying to use flatMapConcat as the following:

Source.empty
      .flatMapConcat {
        Source.fromFuture(Future("hello"))
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }

and the compiler complains:

Error:(31, 26) type mismatch;
 found   : akka.stream.scaladsl.Source[String,akka.NotUsed]
 required: ? => akka.stream.Graph[akka.stream.SourceShape[?],?]
        Source.fromFuture(Future("hello")) 

What am I doing wrong?


Solution

  • Method flatMapConcat has the following signature:

    def flatMapConcat[T, M](f: (Out) => Graph[SourceShape[T], M]): Repr[T]
    

    which, in the case of processing a Source of Strings, would expect a function like:

    f: String => Source(Iterable[String])
    

    Another issue with your sample code is that Source.empty[T] has no elements to be processed hence the subsequent flatMapConcat will never be executed.

    Here's an example of using flatMapConcat to transform each element from a Source of names:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    
    implicit val system = ActorSystem("system")
    implicit val materializer = ActorMaterializer()
    
    Source(List("alice", "bob", "jenn")).
      flatMapConcat{ name => Source(List(s"Hi $name", s"Bye $name")) }.
      runWith(Sink.foreach(println))
    // Hi alice
    // Bye alice
    // Hi bob
    // Bye bob
    // Hi jenn
    // Bye jenn
    

    As a side note, one could replace flatMapConcat in the above example with mapConcat which expects a simpler function signature:

    Source(List("alice", "bob", "jenn")).
      mapConcat{ name => List(s"Hi $name", s"Bye $name") }.
      runWith(Sink.foreach(println))