Search code examples
scalaakkaakka-httpakka-stream

How do I extract a Future[String] from an akka Source[ByteString, _]?


I am attempting to stream a file using akka streams and am running into a small issue extracting the results of the stream into a Future[String]:

def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
  var fileString = ""
  val sink = Sink.foreach[ByteString](byteString => fileString = 
    fileString.concat(byteString.decodeString("US-ASCII")))
  source.runWith(sink)
}

I'm getting a compilation error:

Expression of type Future[Done] does not conform to expected type Future[String]

Can anyone help me understand what I'm doing wrong and what I need to do to extract the results of the stream?


Solution

  • If you look at the definition of Sink.foreach you'll find the evaluation type is Sink[T, Future[Done]] which means it doesn't matter what will happen with the result of the computation of the elements in the stream. Following is the definition:

    def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
    

    On the other hand, the definition of Sink.fold evaluates to a Future[U] being U the type of the zero. In other words, you are able to define what will be the type of the future at the end of the processing.

    The following is the definition (and implementation) for Sink.fold:

    def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
              Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
    

    According to the implementation above you can see that the type to be kept in the materialization is Future[U] because of the Keep.right which means something like: "I don't care if the elements coming in are Ts (or ByteString in your case) I (the stream) will give you Us (or String in your case) .. when I'm done (in a Future)"

    The following is a working example of your case replacing the Sink.foreach with Sink.fold and evaluating the whole expression to Future[String]

    def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
    
      var fileString = ""
    
      //def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
      val sinkForEach: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString](byteString => fileString =
        fileString.concat(byteString.decodeString("US-ASCII")))
    
      /*
        def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
          Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
       */
      val sinkFold: Sink[ByteString, Future[String]] = Sink.fold("") { case (acc, str) =>
        acc + str
      }
    
      val res1: Future[Done] = source.runWith(sinkForEach)
      val res2: Future[String] = source.runWith(sinkFold)
    
      res2
    
    }