Search code examples
concurrencyf#messagemailboxprocessor

How to use TryScan in F# properly


I was trying to find an example about how to use TryScan, but haven't found any, could you help me?

What I would like to do (quite simplified example): I have a MailboxProcessor that accepts two types of mesages.

  • First one GetState returns current state. GetState messages are sent quite frequently

  • The other UpdateState is very expensive (time consuming) - e.g. downloading something from internet and then updates the state accordingly. UpdateState is called only rarely.

My problem is - messages GetState are blocked and wait until preceding UpdateState are served. That's why I tried to use TryScan to process all GetState messages, but with no luck.

My example code:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

If you try to run the code, you will see, that GetState message is not almost processed, because it waits for the result. On the other hand UpdateState is only fire-and-forget, thus blocking effectively getting state.

Edit

Current solution that works for me is this one:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

Reactions to comments: the idea with other MailboxProcessor or ThreadPool that executes UpdateState in parallel is great, but I don't need it currently. All I wanted to do is to process all GetState messages and after that the others. I don't care that during processing UpdateState the agent is blocked.

I'll show you what was the problem on the output:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent

Solution

  • I don't think that the TryScan method will help you in this scenario. It allows you to specify timeout to be used while waiting for messages. Once some message is received, it will start processing the message (ignoring the timeout).

    For example, if you wanted to wait for some specific message, but perform some other checking every second (while waiting) you could write:

    let loop () = async {
      let! res = mbox.TryScan(function
        | ImportantMessage -> Some(async { 
              // process message 
              return 0
            })
        | _ -> None)
      match res with
      | None -> 
           // perform some check & continue waiting
           return! loop ()
      | Some n -> 
           // ImportantMessage was received and processed 
    }
    

    What can you do to avoid blocking the mailbox processor when processing the UpdateState message? The mailbox processor is (logically) single-threaded - you probably don't want to cancel the processing of UpdateState message, so the best option is to start processing it in background and wait until the processing completes. The code that processes UpdateState can then send some message back to the mailbox (e.g. UpdateStateCompleted).

    Here is a sketch how this might look:

    let rec loop (state) = async {
      let! msg = mbox.Receive()
      match msg with
      | GetState(repl) -> 
          repl.Reply(state)
          return! scanning state
      | UpdateState -> 
          async { 
            // complex calculation (runs in parallel)
            mbox.Post(UpdateStateCompleted newState) }
          |> Async.Start
      | UpdateStateCompleted newState ->
          // Received new state from background workflow
          return! loop newState }
    

    Now that the background task is running in parallel, you need to be careful about mutable state. Also, if you send UpdateState messages faster than you can process them, you'll be in trouble. This can be fixed, for example, by ignoring or queueing requests when you're already processing previous one.