Search code examples
scalaakkaakka-stream

Update stream on file change


With code below I read and print the content of file using Akka streams :

package playground

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import akka.stream.ActorMaterializer

object Greeter extends App {

  implicit val system = ActorSystem("map-management-service")
  implicit val materializer = ActorMaterializer()

  FileIO.fromPath(Paths.get("a.csv"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)).runForeach(println)

}

My understanding of using Akka streams is that if the file changes/updates the processing code, in this case println is fired so each time the file is updated the entire file is re-read. But this is not occurring - the file is read once.

How should this be modified such that each time the file a.csv is updated the file is re-read and the println code is re-executed


Solution

  • Alpakka's DirectoryChangesSource could fit your use case. For example:

    import akka.stream.alpakka.file.DirectoryChange
    import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
    
    implicit val system = ActorSystem("map-management-service")
    implicit val materializer = ActorMaterializer()
    
    val myFile = Paths.get("a.csv")
    val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)
    
    changes
      .filter {
        case (path, dirChange) =>
          path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
      }
      .flatMapConcat(_ => FileIO.fromPath(myFile).via(Framing.delimiter(ByteString("\n"), 256, true)))
      .map(_.utf8String)
      .runForeach(println)
    

    The above snippet prints the file contents when the file is created and whenever the file is modified, polling in three-second intervals.