Search code examples
scalaakkastreamingakka-streamakka-actor

Scala streaming a live/growing file


My Scala application kicks off an external process that writes a file to disk. In a separate thread, I want to read that file and copy its contents to an OutputStream until the process is done and the file is no longer growing.

There are a couple of edge cases to consider:

  1. The file may not exist yet when the thread is ready to start.
  2. The thread may copy faster than the process is writing. In other words, it may reach the end of the file while the file is still growing.

BTW I can pass the thread a processCompletionFuture variable which indicates when the file is done growing.

Is there an elegant and efficient way to do this? Perhaps using Akka Streams or actors? (I've tried using an Akka Stream off of the FileInputStream, but the stream seems to terminate as soon as there are no more bytes in the input stream, which happens in case #2).


Solution

  • Alpakka, a library that is built on Akka Streams, has a FileTailSource utility that mimics the tail -f Unix command. For example:

    import akka.NotUsed
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.stream.alpakka.file.scaladsl._
    import akka.util.{ ByteString, Timeout }
    import java.io.OutputStream
    import java.nio.file.Path
    import scala.concurrent._
    import scala.concurrent.duration._
    
    val path: Path = ???
    
    val maxLineSize = 10000
    
    val tailSource: Source[ByteString, NotUsed] = FileTailSource(
      path = path,
      maxChunkSize = maxLineSize,
      startingPosition = 0,
      pollingInterval = 500.millis
    ).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))
    

    The above tailSource reads an entire file line-by-line and continually reads freshly appended data every 500 milliseconds. To copy the stream contents to an OutputStream, connect the source to a StreamConverters.fromOutputStream sink:

    val stream: Future[IOResult] =
      tailSource
        .runWith(StreamConverters.fromOutputStream(() => new OutputStream {
          override def write(i: Int): Unit = ???
          override def write(bytes: Array[Byte]): Unit = ???
        }))
    

    (Note that there is a FileTailSource.lines method that produces a Source[String, NotUsed], but in this scenario it's more felicitous to work with ByteString instead of String. This is why the example uses FileTailSource.apply(), which produces a Source[ByteString, NotUsed].)

    The stream will fail if the file doesn't exist at the time of materialization. Therefore, you'll need to confirm the existence of the file before running the stream. This might be overkill, but one idea is to use Alpakka's DirectoryChangesSource for that.