Search code examples
f#mailboxprocessor

Bank account kata with F# MailboxProcessor slow


I've coded the "classical" bank account kata with F# MailboxProcessor to be thread safe. But when I try to parallelize adding a transaction to an account, it's very slow very quick: 10 parallel calls are responsive (2ms), 20 not (9 seconds)! (See last test Account can be updated from multiple threads beneath)

Since MailboxProcessor supports 30 million messages per second (see theburningmonk's article), where the problem comes from?

// -- Domain ----

type Message =
    | Open of AsyncReplyChannel<bool>
    | Close of AsyncReplyChannel<bool>
    | Balance of AsyncReplyChannel<decimal option>
    | Transaction of decimal * AsyncReplyChannel<bool>

type AccountState = { Opened: bool; Transactions: decimal list }

type Account() =
    let agent = MailboxProcessor<Message>.Start(fun inbox ->
        let rec loop (state: AccountState) =
            async {
                let! message = inbox.Receive()
                match message with
                | Close channel ->
                    channel.Reply state.Opened
                    return! loop { state with Opened = false }
                | Open channel ->
                    printfn $"Opening"
                    channel.Reply (not state.Opened)
                    return! loop { state with Opened = true }
                | Transaction (tran, channel) ->
                    printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
                    channel.Reply true
                    return! loop { state with Transactions = tran :: state.Transactions }
                | Balance channel ->
                    let balance =
                        if state.Opened then
                            state.Transactions |> List.sum |> Some
                        else
                            None
                    balance |> channel.Reply
                    return! loop state
            }
        loop { Opened = false; Transactions = [] }
    )

    member _.Open () = agent.PostAndReply(Open)
    member _.Close () = agent.PostAndReply(Close)
    member _.Balance () = agent.PostAndReply(Balance)
    member _.Transaction (transaction: decimal) =
        agent.PostAndReply(fun channel -> Transaction (transaction, channel))

// -- API ----

let mkBankAccount = Account

let openAccount (account: Account) =
    match account.Open() with
    | true -> Some account
    | false -> None

let closeAccount (account: Account option) =
    account |> Option.bind (fun a ->
        match a.Close() with
        | true -> Some a
        | false -> None)

let updateBalance transaction (account: Account option) =
    account |> Option.bind (fun a ->
        match a.Transaction(transaction) with
        | true -> Some a
        | false -> None)

let getBalance (account: Account option) =
    account |> Option.bind (fun a -> a.Balance())
// -- Tests ----

let should_equal expected actual =
    if expected = actual then
        Ok expected
    else
        Error (expected, actual)

let should_not_equal expected actual =
    if expected <> actual then
        Ok expected
    else
        Error (expected, actual)

let ``Returns empty balance after opening`` =
    let account = mkBankAccount() |> openAccount
    getBalance account |> should_equal (Some 0.0m)

let ``Check basic balance`` =
    let account = mkBankAccount() |> openAccount
    let openingBalance = account |> getBalance
    let updatedBalance =
        account
        |> updateBalance 10.0m
        |> getBalance
    openingBalance |> should_equal (Some 0.0m),
    updatedBalance |> should_equal (Some 10.0m)

let ``Balance can increment or decrement`` =
    let account = mkBankAccount() |> openAccount
    let openingBalance = account |> getBalance
    let addedBalance =
        account
        |> updateBalance 10.0m
        |> getBalance
    let subtractedBalance =
        account
        |> updateBalance -15.0m
        |> getBalance
    openingBalance |> should_equal (Some 0.0m),
    addedBalance |> should_equal (Some 10.0m),
    subtractedBalance |> should_equal (Some -5.0m)

let ``Account can be closed`` =
    let account =
        mkBankAccount()
        |> openAccount
        |> closeAccount
    getBalance account |> should_equal None,
    account |> should_not_equal None

#time
let ``Account can be updated from multiple threads`` =
    let account =
        mkBankAccount()
        |> openAccount
    let updateAccountAsync =
        async {
            account
            |> updateBalance 1.0m
            |> ignore
        }
    let nb = 10 // 👈 10 is quick (2ms), 20 is so long (9s)
    updateAccountAsync
    |> List.replicate nb
    |> Async.Parallel
    |> Async.RunSynchronously
    |> ignore
    getBalance account |> should_equal (Some (decimal nb))
#time

Solution

  • Your problem is that your code don't uses Async all the way up.

    Your Account class has the method Open, Close, Balance and Transaction and you use a AsyncReplyChannel but you use PostAndReply to send the message. This means: You send a message to the MailboxProcessor with a channel to reply. But, at this point, the method waits Synchronously to finish.

    Even with Async.Parallel and multiple threads it can mean a lot of threads lock themsels. If you change all your Methods to use PostAndAsyncReply then your problem goes away.

    There are two other performance optimization that can speed up performance, but are not critical in your example.

    1. Calling the Length of a list is bad. To calculate the length of a list, you must go through the whole list. You only use this in Transaction to print the length, but consider if the transaction list becomes longer. You alway must go through the whole list, whenever you add a transaction. This will be O(N) of your transaction list.

    2. The same goes for calling (List.sum). You have to calculate the current Balance whenever you call Balance. Also O(N).

    As you have a MailboxProcessor, you also could calculate those two values instead of completly recalculating those values again and again.Thus, they become O(1) operations.

    On top, i would change the Open, Close and Transaction messages to return nothing, as in my Opinion, it doesn't make sense that they return anything. Your examples even makes me confused of what the bool return values even mean.

    In the Close message you return state.Opened before you set it to false. Why?

    In the Open message you return the negated state.Opened. How you use it later it just looks wrong.

    If there is more meaning behind the bool please make a distinct Discriminated Union out of it, that describes the purpose of what it returns.

    You used an option<Acount> throughout your code, i removed it, as i don't see any purpose of it.

    Anyway, here is a full example, of how i would write your code that don't have the speed problems.

    
    type Message =
        | Open
        | Close
        | Balance     of AsyncReplyChannel<decimal option>
        | Transaction of decimal
    
    type AccountState = {
        Opened:             bool
        Transactions:       decimal list
        TransactionsLength: int
        CurrentBalance:     decimal
    }
    
    type Account() =
        let agent = MailboxProcessor<Message>.Start(fun inbox ->
            let rec loop (state: AccountState) = async {
                match! inbox.Receive() with
                | Close ->
                    printfn "Closing"
                    return! loop { state with Opened = false }
                | Open ->
                    printfn "Opening"
                    return! loop { state with Opened = true }
                | Transaction tran ->
                    let l = state.TransactionsLength + 1
                    printfn $"Adding transaction {tran}, nb = {l}"
    
                    if state.Opened then
                        return! loop {
                            state with
                                Transactions       = tran :: state.Transactions
                                TransactionsLength = l
                                CurrentBalance     = state.CurrentBalance + tran
                        }
                    else
                        return! loop state
                | Balance channel ->
                    if   state.Opened
                    then channel.Reply (Some state.CurrentBalance)
                    else channel.Reply  None
                    return! loop state
            }
    
            let defaultAccount = {
                Opened             = false
                Transactions       = []
                TransactionsLength = 0
                CurrentBalance     = 0m
            }
            loop defaultAccount
        )
    
        member _.Open        ()          = agent.Post(Open)
        member _.Close       ()          = agent.Post(Close)
        member _.Balance     ()          = agent.PostAndAsyncReply(Balance)
        member _.Transaction transaction = agent.Post(Transaction transaction)
    
    (* Test *)
    
    let should_equal expected actual =
        if expected = actual then
            Ok expected
        else
            Error (expected, actual)
    
    (* --- API --- *)
    
    let mkBankAccount = Account
    
    (* Opens the Account *)
    let openAccount  (account: Account) =
        account.Open ()
    
    (* Closes the Account *)
    let closeAccount (account: Account) =
        account.Close ()
    
    (* Updates Account *)
    let updateBalance transaction (account: Account) =
        account.Transaction(transaction)
    
    (* Gets the current Balance *)
    let getBalance (account: Account) =
        account.Balance ()
    
    #time
    let ``Account can be updated from multiple threads`` =
        let account = mkBankAccount ()
        openAccount account
    
        let updateBalanceAsync = async {
            updateBalance 1.0m account
        }
    
        let nb = 50
    
        List.replicate nb updateBalanceAsync
        |> Async.Parallel
        |> Async.RunSynchronously
        |> ignore
    
        Async.RunSynchronously (async {
            let! balance = getBalance account
            printfn "Balance is %A should be (Some %f)" balance (1.0m * decimal nb)
        })
    #time