With code below I read and print the content of file using Akka streams :
package playground
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import akka.stream.ActorMaterializer
object Greeter extends App {
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)).runForeach(println)
}
My understanding of using Akka streams is that if the file changes/updates the processing code, in this case println
is fired so each time the file is updated the entire file is re-read. But this is not occurring - the file is read once.
How should this be modified such that each time the file a.csv
is updated the file is re-read and the println
code is re-executed
Alpakka's DirectoryChangesSource
could fit your use case. For example:
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)
changes
.filter {
case (path, dirChange) =>
path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
}
.flatMapConcat(_ => FileIO.fromPath(myFile).via(Framing.delimiter(ByteString("\n"), 256, true)))
.map(_.utf8String)
.runForeach(println)
The above snippet prints the file contents when the file is created and whenever the file is modified, polling in three-second intervals.