Search code examples
asynchronousf#mailboxprocessor

MailboxProcessor first loop can't run if program immediately fails


I have a command running a SFTP check periodically and logging the result to a file.

let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    sw.Close()
    sw.Dispose()
0  

It loops over a MailboxProcessor

let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
        printfn "%s" msg
        // loop to top
        return! messageLoop()  
        }
    // start the loop 
    messageLoop() 
    )

which is called to write the messages to the log

let sftpExample local host port username (password:string) =
    async {
        use client = new SftpClient(host, port, username, password)
        client.Connect()
        sprintf "Connected to %s\nroot dir list" host  |> printerAgent.Post
        do! downloadDir local client ""   
        sprintf "Done, disconnecting now" |> printerAgent.Post
        client.Disconnect()
    } |> Async.RunSynchronously

The file downloads are asynchronous, as well as the corresponding messages, but all appears to work well.

The problem is that - if, for some reasons, the sftp connection immediately fails, the MailboxProcessor has no time to log the exception message.

What I've tried to do - which is working indeed - was adding a printfn "%s" ex.Message before the end: I just wanted to know if someone envisions a better solution.

FYI, the full code is in this gist.


Solution

  • In fact, what you want is for the program to wait until the MailboxProcessor has finished handling all of its message queue before the program exits. Your printfn "%s" ex.Message seems to be working, but it's not guaranteed to work: if the MailboxProcessor had multiple items in its queue, the thread running the printfn function might finish before the MailboxProcessor's thread had had time to get through all of its messages.

    The design I would recommend is to change the input of your printerAgent to be a DU like the following:

    type printerAgentMsg =
        | Message of string
        | Shutdown
    

    Then when you want the printer agent to finish sending its messages, use MailboxProcessor.PostAndReply (and note the usage example in the docs) in the main function and send it the Shutdown message. Remember that MailboxProcessor messages are queued: by the time it receives the Shutdown message, it will have already gone through the rest of the messages in the queue. So all it needs to do to handle the Shutdown message is to return a unit reply, and simply not call its loop again. And because you used PostAndReply rather than PostAndReplyAsync, the main function will block until the MailboxProcessor has finished doing all its work. (To avoid any chance of blocking forever, I'd recommend setting a timeout like 10 seconds in your PostAndReply call; the default timeout is -1, meaning wait forever).

    EDIT: Here's an example (NOT tested, use at own risk) of what I mean:

    type printerAgentMsg =
        | Message of string
        | Shutdown of AsyncReplyChannel<unit>
    
    let printerAgent = MailboxProcessor.Start(fun inbox-> 
        // the message processing function
        let rec messageLoop() = async{        
            // read a message
            let! msg = inbox.Receive()
            // process a message
            match msg with
            | Message text ->
                sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
                printfn "%s" text
                // loop to top
                return! messageLoop()
            | Shutdown replyChannel ->
                replyChannel.Reply()
                // We do NOT do return! messageLoop() here
            }
        // start the loop 
        messageLoop() 
        )
    
    let logPath = Path.Combine(config.["SharedFolder"],timestamp)
    let sw = new StreamWriter(logPath,true)
    //...
    [<EntryPoint>]
    let main argv = 
        try 
            sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
        with 
        | ex -> 
            ex.Message |> Message |> printerAgent.Post
            printfn "%s" ex.Message // <- NOTICE THIS LINE
        printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000)  // Timeout = 10000 ms = 10 seconds
        sw.Close()
        sw.Dispose()