I am attempting to stream a file using akka streams and am running into a small issue extracting the results of the stream into a Future[String]:
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
val sink = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
source.runWith(sink)
}
I'm getting a compilation error:
Expression of type Future[Done] does not conform to expected type Future[String]
Can anyone help me understand what I'm doing wrong and what I need to do to extract the results of the stream?
If you look at the definition of Sink.foreach
you'll find the evaluation type is Sink[T, Future[Done]]
which means it doesn't matter what will happen with the result of the computation of the elements in the stream. Following is the definition:
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
On the other hand, the definition of Sink.fold
evaluates to a Future[U]
being U
the type of the zero
. In other words, you are able to define what will be the type of the future at the end of the processing.
The following is the definition (and implementation) for Sink.fold
:
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
According to the implementation above you can see that the type to be kept in the materialization is Future[U]
because of the Keep.right
which means something like: "I don't care if the elements coming in are T
s (or ByteString
in your case) I (the stream) will give you U
s (or String
in your case) .. when I'm done (in a Future
)"
The following is a working example of your case replacing the Sink.foreach
with Sink.fold
and evaluating the whole expression to Future[String]
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
//def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
val sinkForEach: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
/*
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
*/
val sinkFold: Sink[ByteString, Future[String]] = Sink.fold("") { case (acc, str) =>
acc + str
}
val res1: Future[Done] = source.runWith(sinkForEach)
val res2: Future[String] = source.runWith(sinkFold)
res2
}