Search code examples
scalaakkaakka-stream

Subscribing to an Akka stream, reading files


I'm trying to read a file in Scala with Akka Streams, I want to put the result in a list. I tried the following code, and the list gets incremented with new values inside the sink, but outside I get an empty list.

def readStream (path : String, date : String) : List[Array[String]] = {
  var lines: List[scala.Array[String]] = List[scala.Array[String]]()

  implicit val system = ActorSystem("Sys")
  val settings = ActorMaterializerSettings(system)
  implicit val materializer = ActorMaterializer(settings)

  val sink: Sink[String, Future[Done]] = Sink.foreach((x : String) => {
    val list : List[scala.Array[String]] = List(x.split("|"))
    lines = lines ++ list
    // println(lines.length)
  })

  val result: Unit = FileIO.fromPath(Paths.get(path + "transactions_" + date + ".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
    .to(sink)
    .run()
  lines
}

Solution

  • Three things: (1) pass the actor system and materializer to your method (explicitly or as implicit parameters) instead of creating them inside the method, (2) use Sink.seq, and (3) use toMat and Keep.right to get the materialized value of the Sink (to retains the materialized value of the Source):

    val result: Future[Seq[String]] =
      FileIO.fromPath(...)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
        .toMat(Sink.seq)(Keep.right)
        .run()
    

    Alternatively, a shorthand for using toMat and Keep.right is runWith:

    val result: Future[Seq[String]] =
      FileIO.fromPath(...)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
        .runWith(Sink.seq)