Search code examples
akkaakka-stream

InputStream closing in graph run


I have a graph that processes a large file. The file is accessed via an iterator created from the input stream.

for {
  in <- managed(new FileInputStream(inputFile))
} {
  // 3rd party lib that creates iterator from input stream
  val iterator = () => EntityIterator.fromPbf(in) 
  Source
    .fromIterator[OSMEntity](iterator)
    .runForeach(println)
}

(Here I'm using ARM, but I'm happy to hand roll the input stream management if I need to).

I expect this code to print every object in the iterator. Instead it finishes immediately because in is closed as soon as the ARM block is closed. Because runForeach doesn't block, it is closed immediately.

If I don't use ARM, when should I manually close the input stream? Or do I not need to because Akka will do it when EOF is reached?


Solution

  • I have not used scala-arm, but remember that runForeach returns a Future[Done]. You can handle clean with onComplete.

    val buf = scala.io.Source.fromFile("readme.md")
    Source.fromIterator(() => buf)
      .runForeach(print)
      .onComplete {
        case Success(_) => buf.close()
        case Failure(err) => println(s"error! ${err.getMessage}")
      }
    

    You should also consider Akka's FileIO as well!

    FileIO.fromPath(Paths.get("readme.md"))
      .map(_.utf8String)
      .runForeach(print)
      .onComplete {
        case Success(res) => println("done!")
        case Failure(err) => println(err.getMessage)
      }