Search code examples
multithreadingf#agentmailboxprocessor

F# MailboxProcessor limit parallelism


I'm new to F# and trying to experiment with the MailboxProcessor to ensure that state changes are done in isolation.

In short, I am posting actions (immutable objects describing state chanage) to the MailboxProcessor, in the recursive function I read the message and generate a new state (i.e. add an item to a collection in the example below) and send that state to the next recursion.

open System

type AppliationState =
    {
        Store : string list
    }
    static member Default = 
        {
            Store = List.empty
        }
    member this.HandleAction (action:obj) =
        match action with
        | :? string as a -> { this with Store = a :: this.Store }
        | _ -> this

type Agent<'T> = MailboxProcessor<'T>     

[<AbstractClass; Sealed>]
type AppHolder private () =
    static member private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
                return! loop s'
                }
        loop AppliationState.Default)

    static member HandleAction (action:obj) =
        AppHolder.Processor.Post action

[<EntryPoint>]
let main argv =
    AppHolder.HandleAction "a"
    AppHolder.HandleAction "b"
    AppHolder.HandleAction "c"
    AppHolder.HandleAction "d"

    Console.ReadLine()
    0 // return an integer exit code

Expected output is:

s: 0 s': 1
s: 1 s': 2
s: 2 s': 3
s: 3 s': 4  

What I get is:

s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1

Reading the documentation for the MailboxProcessor and googling about it my conclusion is that it is a Queue of messages, processed by a 'single-thread', instead it looks like they are all processed in parallel.

Am I totally off the field here?


Solution

  • I think the issue must be in your implementation of HandleAction. I implemented the following, and it produces the expected output.

    open System
    
    type ApplicationState =
        {
            Items: int list
        }
        static member Default = {Items = []}
        member this.HandleAction x = {this with Items = x::this.Items}
    
    type Message = Add of int
    
    let Processor = MailboxProcessor<Message>.Start(fun inbox ->
        let rec loop (s : ApplicationState) =
            async {
                let! (Add action) = inbox.Receive()
                let s' = s.HandleAction action
                Console.WriteLine("s: " + s.Items.Length.ToString() + " s': " + s'.Items.Length.ToString())
                return! loop s'
            }
        loop ApplicationState.Default)
    
    Processor.Post (Add 1)
    Processor.Post (Add 2)
    Processor.Post (Add 3)
    Processor.Post (Add 4)
    
    
    // OUTPUT
    // s: 0 s': 1
    // s: 1 s': 2
    // s: 2 s': 3
    // s: 3 s': 4
    

    EDIT

    After seeing the updated code sample, I believe the correct F# solution would just be to switch the AppHolder type from being a class to a module. The updated code would like this:

    open System
    
    type AppliationState =
        {
            Store : string list
        }
        static member Default = 
            {
                Store = List.empty
            }
        member this.HandleAction (action:obj) =
            match action with
            | :? string as a -> { this with Store = a :: this.Store }
            | _ -> this
    
    type Agent<'T> = MailboxProcessor<'T>     
    
    module AppHolder =
        let private processor = Agent.Start(fun inbox ->
            let rec loop (s : AppliationState) =
                async {
                    let! action = inbox.Receive()
                    let s' = s.HandleAction action
                    Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
                    return! loop s'
                }
            loop AppliationState.Default)
    
        let handleAction (action:obj) =
            processor.Post action
    
    
    AppHolder.handleAction "a"
    AppHolder.handleAction "b"
    AppHolder.handleAction "c"
    AppHolder.handleAction "d"
    

    This outputs the same result as before:

    {s: 0 s': 1
    {s: 1 s': 2
    {s: 2 s': 3
    {s: 3 s': 4