I'm new to F# as well as Akka.Net and trying to achieve the following with them:
I want to create an actor (Tail) that receives a file location and then listens for events at that location using FileSystemWatcher and some Observables, forwarding them on as messages to some other actor for processing.
The problem I'm having is that the code to listen for the events only picks up one event at a time and ignores all the others. e.g. if I copy 20 files into the directory being watched it only seems to send out the event for 1 of them.
Here's my Actor code:
module Tail
open Akka
open Akka.FSharp
open Akka.Actor
open System
open Model
open ObserveFiles
open ConsoleWriteActor
let handleTailMessages tm =
match tm with
| StartTail (f,r) ->
observeFile f consoleWriteActor |!> consoleWriteActor
|> ignore
let spawnTail =
fun (a : Actor<IMessage> ) ->
let rec l (count : int) = actor{
let! m = a.Receive()
handleTailMessages m
return! l (count + 1)
}
l(0)
and here's the code that listens for events:
module ObserveFiles
open System
open System.IO
open System.Threading
open Model
open Utils
open Akka
open Akka.FSharp
open Akka.Actor
let rec observeFile (absolutePath : string) (a : IActorRef ) = async{
let fsw = new FileSystemWatcher(
Path = Path.GetDirectoryName(absolutePath),
Filter = "*.*",
EnableRaisingEvents = true,
NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
)
let prepareMessage (args: EventArgs) =
let text =
match box args with
| :? FileSystemEventArgs as fsa ->
match fsa.ChangeType with
| WatcherChangeTypes.Changed -> "Changed " + fsa.Name
| WatcherChangeTypes.Created -> "Created " + fsa.Name
| WatcherChangeTypes.Deleted -> "Deleted " + fsa.Name
| WatcherChangeTypes.Renamed -> "Renamed " + fsa.Name
| _ -> "Some other change " + fsa.ChangeType.ToString()
| :? ErrorEventArgs as ea -> "Error: " + ea.GetException().Message
| o -> "some other unexpected event occurd" + o.GetType().ToString()
WriteMessage text
let sendMessage x = async{ async.Return(prepareMessage x) |!> a
return! observeFile absolutePath a }
let! occurance =
[
fsw.Changed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Created |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Deleted |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Renamed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Error |> Observable.map(fun x -> sendMessage (x :> EventArgs));
]
|> List.reduce Observable.merge
|> Async.AwaitObservable
return! occurance
}
It took quite a few hacks to get it to this point, any advice on how I could change it, so that it picks up and processes all the events while the actor is running would be greatly appreciated.
When designing task like that, we could split it into following components:
FileSystemWatcher
for specific path. It should subscribe to incoming events and redirect them as messages to actor responsible for receiving change events. It should also free disposable resources when it's closed.Example code:
open Akka.FSharp
open System
open System.IO
let system = System.create "observer-system" <| Configuration.defaultConfig()
let observer filePath consoleWriter (mailbox: Actor<_>) =
let fsw = new FileSystemWatcher(
Path = filePath,
Filter = "*.*",
EnableRaisingEvents = true,
NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
)
// subscribe to all incoming events - send them to consoleWriter
let subscription =
[fsw.Changed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
fsw.Created |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
fsw.Deleted |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
fsw.Renamed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());]
|> List.reduce Observable.merge
|> Observable.subscribe(fun x -> consoleWriter <! x)
// don't forget to free resources at the end
mailbox.Defer <| fun () ->
subscription.Dispose()
fsw.Dispose()
let rec loop () = actor {
let! msg = mailbox.Receive()
return! loop()
}
loop ()
// create actor responsible for printing messages
let writer = spawn system "console-writer" <| actorOf (printfn "%A")
// create manager responsible for serving listeners for provided paths
let manager = spawn system "manager" <| actorOf2 (fun mailbox filePath ->
spawn mailbox ("observer-" + Uri.EscapeDataString(filePath)) (observer filePath writer) |> ignore)
manager <! "testDir"