Search code examples
asynchronousf#filesystemwatcherobservableakka.net

Akka.net F# stateful actor that awaits multipe FileSystemWatcher Observable events


I'm new to F# as well as Akka.Net and trying to achieve the following with them:

I want to create an actor (Tail) that receives a file location and then listens for events at that location using FileSystemWatcher and some Observables, forwarding them on as messages to some other actor for processing.

The problem I'm having is that the code to listen for the events only picks up one event at a time and ignores all the others. e.g. if I copy 20 files into the directory being watched it only seems to send out the event for 1 of them.

Here's my Actor code:

module Tail

open Akka
open Akka.FSharp
open Akka.Actor
open System
open Model
open ObserveFiles
open ConsoleWriteActor

let handleTailMessages tm =
    match tm with
        | StartTail (f,r) ->
            observeFile f consoleWriteActor |!> consoleWriteActor

    |> ignore

let spawnTail =
    fun (a : Actor<IMessage> )  -> 
    let rec l (count : int) = actor{

        let! m = a.Receive()
        handleTailMessages m
        return! l (count + 1)
    } 
    l(0) 

and here's the code that listens for events:

module ObserveFiles
open System
open System.IO
open System.Threading
open Model
open Utils
open Akka
open Akka.FSharp
open Akka.Actor



let rec observeFile (absolutePath : string) (a : IActorRef )  = async{

    let fsw = new FileSystemWatcher(
                        Path = Path.GetDirectoryName(absolutePath), 
                        Filter = "*.*",
                        EnableRaisingEvents = true, 
                        NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
                        )

    let prepareMessage  (args: EventArgs) =
        let text = 
            match box args with
            | :? FileSystemEventArgs as fsa ->
                match fsa.ChangeType with
                | WatcherChangeTypes.Changed -> "Changed " + fsa.Name
                | WatcherChangeTypes.Created ->  "Created " + fsa.Name
                | WatcherChangeTypes.Deleted -> "Deleted " + fsa.Name
                | WatcherChangeTypes.Renamed -> "Renamed " + fsa.Name
                | _ -> "Some other change " + fsa.ChangeType.ToString()
            | :? ErrorEventArgs as ea -> "Error: " + ea.GetException().Message
            | o -> "some other unexpected event occurd" + o.GetType().ToString()
        WriteMessage text 


    let sendMessage x = async{  async.Return(prepareMessage x) |!> a
                                return! observeFile absolutePath a }

    let! occurance  = 
        [
        fsw.Changed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Created |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Deleted |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Renamed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        fsw.Error |> Observable.map(fun x -> sendMessage (x :> EventArgs));
        ] 
        |> List.reduce Observable.merge
        |> Async.AwaitObservable

    return! occurance
}

It took quite a few hacks to get it to this point, any advice on how I could change it, so that it picks up and processes all the events while the actor is running would be greatly appreciated.


Solution

  • When designing task like that, we could split it into following components:

    1. Create manager responsible for receiving all messages - it's main role is to respond on incoming directory listening requests. Once request comes in, it creates a child actor responsible for listening under this specific directory.
    2. Child actor is responsible for managing FileSystemWatcher for specific path. It should subscribe to incoming events and redirect them as messages to actor responsible for receiving change events. It should also free disposable resources when it's closed.
    3. Actor responsible for receiving change events - in our case by displaying them on the console.

    Example code:

    open Akka.FSharp
    open System
    open System.IO
    
    let system = System.create "observer-system" <| Configuration.defaultConfig()
    
    let observer filePath consoleWriter (mailbox: Actor<_>) =    
        let fsw = new FileSystemWatcher(
                            Path = filePath, 
                            Filter = "*.*",
                            EnableRaisingEvents = true, 
                            NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
                            )
        // subscribe to all incoming events - send them to consoleWriter
        let subscription = 
            [fsw.Changed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
             fsw.Created |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
             fsw.Deleted |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());
             fsw.Renamed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());]
                 |> List.reduce Observable.merge
                 |> Observable.subscribe(fun x -> consoleWriter <! x)
    
        // don't forget to free resources at the end
        mailbox.Defer <| fun () -> 
            subscription.Dispose()
            fsw.Dispose()
    
        let rec loop () = actor {
            let! msg = mailbox.Receive()
            return! loop()
        }
        loop ()
    
    // create actor responsible for printing messages
    let writer = spawn system "console-writer" <| actorOf (printfn "%A")
    
    // create manager responsible for serving listeners for provided paths
    let manager = spawn system "manager" <| actorOf2 (fun mailbox filePath ->
        spawn mailbox ("observer-" + Uri.EscapeDataString(filePath)) (observer filePath writer) |> ignore)
    
    manager <! "testDir"