I want to implement the concept of a Port of the CCR Framework in F# (as CCR is not officially supported for .Net 4.0). I know that one can use the MailboxProcessor class in F# to do this. This works perfectly for simple Receive Arbiters but I need the concept of the Interleave Arbiter, i.e. I want to control which messages are processed exclusively and which are processed concurrently. So far I've got no idea to implement this in F# and I would be grateful for your help.
I'm not very familiar with CCR, but I'll try to answer - my understanding is that interleave arbiter behaves a bit like ReaderWriterLock
. That is, you can specify some operations that can run in parallel (reads) and some operations that are exclusive (writes).
The following agent is one way to implement it (not tested, but type checks :-)). The agent exposes two operations that are intended for public use. The last one is internal:
type Message<'T> =
| PerformReadOperation of ('T -> Async<unit>)
| PerformWriteOperation of ('T -> Async<'T>)
| ReadOperationCompleted
By sending the agent PerformReadOperation
, you're giving it an operation that should be run (once) using the state and possibly in parallel with other read operations.
By sending the agent PerformWriteOperation
, you're giving it an operation that calculates a new state and must be executed after all read operations complete. (If you were working with immutable state, that would make things simpler - you wouldn't have to wait until readers complete! But the implementation below implements the waiting).
The agent starts with some initial state:
let initial = // initial state
And the rest of the agent is implemented using two loops:
let interleaver = MailboxProcessor.Start(fun mbox ->
// Asynchronously wait until all read operations complete
let rec waitUntilReadsComplete reads =
if reads = 0 then async { return () }
else mbox.Scan(fun msg ->
match msg with
| ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1))
| _ -> None)
let rec readingLoop state reads = async {
let! msg = mbox.Receive()
match msg with
| ReadOperationCompleted ->
// Some read operation completed - decrement counter
return! readingLoop state (reads - 1)
| PerformWriteOperation(op) ->
do! waitUntilReadsComplete reads
let! newState = op state
return! readingLoop newState 0
| PerformReadOperation(op) ->
// Start the operation in background & increment counter
async { do! op state
mbox.Post(ReadOperationCompleted) }
|> Async.Start
return! readingLoop state (reads + 1) }
readingLoop initial 0)