Search code examples
multithreadingasynchronousf#mailboxprocessor

Use a MailboxProcessor with reply-channel to create limited agents that return values in order


Basically, I want to change the following into a limited threading solution, because in my situation the list of calculations is too large, spawning too many threads, and I'd like to experiment and measure performance with less threads.

// the trivial approach (and largely my current situation)
let doWork() = 
    [1 .. 10]
    |> List.map (fun i -> async { 
        do! Async.Sleep (100 * i)   // longest thread will run 1 sec
        return i * i                // some complex calculation returning a certain type
        })
    |> Async.Parallel
    |> Async.RunSynchronously       // works, total wall time 1s

My new approach, this code is borrowed/inspired by this online snippet from Tomas Petricek (which I tested, it works, but I need it to return a value, not unit).

type LimitAgentMessage = 
  | Start of Async<int> *  AsyncReplyChannel<int>
  | Finished

let threadingLimitAgent limit = MailboxProcessor.Start(fun inbox -> async {

    let queue = System.Collections.Generic.Queue<_>()
    let count = ref 0
    while true do
        let! msg = inbox.Receive() 
        match msg with 
        | Start (work, reply) -> queue.Enqueue((work, reply))
        | Finished -> decr count
        if count.Value < limit && queue.Count > 0 then
            incr count
            let work, reply = queue.Dequeue()
            // Start it in a thread pool (on background)
            Async.Start(async { 
                let! x = work 
                do! async {reply.Reply x }
                inbox.Post(Finished) 
            }) 
  })


// given a synchronous list of tasks, run each task asynchronously, 
// return calculated values in original order
let worker lst = 
    // this doesn't work as expected, it waits for each reply
    let agent = threadingLimitAgent 10
    lst 
    |> List.map(fun x ->            
        agent.PostAndReply(
            fun replyChannel -> Start(x, replyChannel)))

Now, with this in place, the original code would become:

let doWork() = 
    [1 .. 10]
    |> List.map (fun i -> async { 
        do! Async.Sleep (100 * i)   // longest thread will run 1 sec
        return i * i                // some complex calculation returning a certain type
        })
    |> worker       // worker is not working (correct output, runs 5.5s)

All in all, the output is correct (it does calculate and propagate back the replies), but it does not do so in the (limited set) of threads.

I've been playing around a bit, but think I'm missing the obvious (and besides, who knows, someone may like the idea of a limited-threads mailbox processor that returns its calculations in order).


Solution

  • The problem is the call to agent.PostAndReply. PostAndReply will block until the work has finished. Calling this inside List.map will cause the work to be executed sequentially. One solution is to use PostAndAsyncReply which does not block and also returns you an async handle for getting the result back.

    let worker lst = 
        let agent = threadingLimitAgent 10
        lst 
        |> List.map(fun x ->            
            agent.PostAndAsyncReply(
                fun replyChannel -> Start(x, replyChannel)))
        |> Async.Parallel
    
    let doWork() = 
        [1 .. 10]
        |> List.map (fun i -> async { 
            do! Async.Sleep (100 * i)  
            return i * i               
            })
        |> worker      
        |> Async.RunSynchronously
    

    That's of course only one possible solution (getting all async handles back and awaiting them in parallel).