Basically, I want to change the following into a limited threading solution, because in my situation the list of calculations is too large, spawning too many threads, and I'd like to experiment and measure performance with less threads.
// the trivial approach (and largely my current situation)
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> Async.Parallel
|> Async.RunSynchronously // works, total wall time 1s
My new approach, this code is borrowed/inspired by this online snippet from Tomas Petricek (which I tested, it works, but I need it to return a value, not unit).
type LimitAgentMessage =
| Start of Async<int> * AsyncReplyChannel<int>
| Finished
let threadingLimitAgent limit = MailboxProcessor.Start(fun inbox -> async {
let queue = System.Collections.Generic.Queue<_>()
let count = ref 0
while true do
let! msg = inbox.Receive()
match msg with
| Start (work, reply) -> queue.Enqueue((work, reply))
| Finished -> decr count
if count.Value < limit && queue.Count > 0 then
incr count
let work, reply = queue.Dequeue()
// Start it in a thread pool (on background)
Async.Start(async {
let! x = work
do! async {reply.Reply x }
inbox.Post(Finished)
})
})
// given a synchronous list of tasks, run each task asynchronously,
// return calculated values in original order
let worker lst =
// this doesn't work as expected, it waits for each reply
let agent = threadingLimitAgent 10
lst
|> List.map(fun x ->
agent.PostAndReply(
fun replyChannel -> Start(x, replyChannel)))
Now, with this in place, the original code would become:
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i) // longest thread will run 1 sec
return i * i // some complex calculation returning a certain type
})
|> worker // worker is not working (correct output, runs 5.5s)
All in all, the output is correct (it does calculate and propagate back the replies), but it does not do so in the (limited set) of threads.
I've been playing around a bit, but think I'm missing the obvious (and besides, who knows, someone may like the idea of a limited-threads mailbox processor that returns its calculations in order).
The problem is the call to agent.PostAndReply
. PostAndReply
will block until the work has finished. Calling this inside List.map
will cause the work to be executed sequentially. One solution is to use PostAndAsyncReply
which does not block and also returns you an async handle for getting the result back.
let worker lst =
let agent = threadingLimitAgent 10
lst
|> List.map(fun x ->
agent.PostAndAsyncReply(
fun replyChannel -> Start(x, replyChannel)))
|> Async.Parallel
let doWork() =
[1 .. 10]
|> List.map (fun i -> async {
do! Async.Sleep (100 * i)
return i * i
})
|> worker
|> Async.RunSynchronously
That's of course only one possible solution (getting all async handles back and awaiting them in parallel).