Search code examples
exceptionf#parallel-processingmailboxprocessor

F# MailboxProcessor - multiple waiting reader continuations for mailbox


Im playing around with writing something like a really simple asynchronous testing framework. But I think I'm hitting some kind of limitation or bug. Sorry but I was not able to reproduce this on a smaller codebase.

This is the basic Framework I came up with:

module TestRunner
    open System

    type TestOptions = {
        Writer : ConsoleColor -> string -> unit}
    type TestResults = {
        Time : TimeSpan
        Failure : exn option
        }
    type Test = {
        Name : string
        Finished : IEvent<TestResults>
        SetFinished : TestResults -> unit
        TestFunc : TestOptions -> Async<TestResults> }

    let createTest name f =  
        let ev = new Event<TestResults>()
        {
            Name = name 
            Finished = ev.Publish
            SetFinished = (fun res -> ev.Trigger res)
            TestFunc = 
                (fun options -> async {
                    let watch = System.Diagnostics.Stopwatch.StartNew()
                    try
                        do! f options
                        watch.Stop()
                        return { Failure = None; Time = watch.Elapsed }
                    with exn ->
                        watch.Stop()
                        return { Failure = Some exn; Time = watch.Elapsed }
                    })}

    let simpleTest name f = 
        createTest name (fun options -> f options.Writer)

    /// Create a new Test and change the result
    let mapResult mapping test = 
        { test with
            TestFunc = 
                (fun options -> async {
                    let! result = test.TestFunc options
                    return mapping result})}

    let writeConsole color f = 
        let old = System.Console.ForegroundColor
        try
            System.Console.ForegroundColor <- color
            f()
        finally
            System.Console.ForegroundColor <- old

    let printColor color (text:String) = 
        writeConsole color (fun _ -> Console.WriteLine(text))


    type WriterMessage = 
        | NormalWrite of ConsoleColor * String
        | StartTask of AsyncReplyChannel<int> * String
        | WriteMessage of int * ConsoleColor * String
        | EndTask of int

    /// will handle printing jobs for two reasons
    /// 1. Nice output grouped by tests (StartTask,WriteMessage,EndTask)
    /// 2. Print Summary after all tests finished (NormalWrite)
    let writer = MailboxProcessor.Start (fun inbox -> 
        let currentTask = ref 0
        let newHandle (returnHandle:AsyncReplyChannel<int>) = 
            let handle = System.Threading.Interlocked.Increment currentTask
            returnHandle.Reply handle
            handle 

        // the tasks describe which tasks are currently waiting to be processed
        let rec loop tasks = async {
            let! newTasks =
                match tasks with
                /// We process the Task with the number t and the name name
                | (t, name) :: next -> 
                    inbox.Scan
                        (fun msg -> 
                            match msg with
                            | EndTask (endTask) -> 
                                // if the message is from the current task finish it
                                if t = endTask then
                                    Some (async { return next })
                                else None
                            | WriteMessage(writeTask, color, message) ->
                                if writeTask = t then 
                                    Some (async {
                                        printColor color (sprintf "Task %s: %s" name message)
                                        return tasks
                                    })
                                else None
                            | StartTask (returnHandle, name) -> 
                                // Start any tasks instantly and add them to the list (because otherwise they would just wait for the resonse)
                                Some (async { 
                                    let handle = newHandle returnHandle
                                    return (List.append tasks [handle, name]) })
                            | _ -> None)
                // No Current Tasks so just start ones or process the NormalWrite messages
                | [] ->
                    inbox.Scan     
                        (fun msg -> 
                            match msg with
                            | StartTask (returnHandle, name) -> 
                                Some (async { 
                                    let handle = newHandle returnHandle
                                    return [handle, name] })
                            | NormalWrite(color, message) ->
                                Some (async {
                                    printColor color message
                                    return []
                                })
                            | _ -> None)   

            return! loop newTasks 
        }
        loop [])

    /// Write a normal message via writer
    let writerWrite color (text:String) = 
        writer.Post(NormalWrite(color, text))

    /// A wrapper around the communication (to not miss EndTask for a StartTask)
    let createTestWriter name f = async {
        let! handle = writer.PostAndAsyncReply(fun reply -> StartTask(reply, name))
        try
            let writer color s = 
                writer.Post(WriteMessage(handle,color,s))
            return! f(writer)
        finally
            writer.Post (EndTask(handle))
        }
    /// Run the given test and print the results
    let testRun t = async {
        let! results = createTestWriter t.Name (fun writer -> async {
            writer ConsoleColor.Green (sprintf "started")
            let! results = t.TestFunc { Writer = writer }
            match results.Failure with
            | Some exn -> 
                writer ConsoleColor.Red (sprintf "failed with %O" exn)
            | None ->
                writer ConsoleColor.Green (sprintf "succeeded!")
            return results}) 
        t.SetFinished results
        }
    /// Start the given task with the given amount of workers
    let startParallelMailbox workerNum f = 
        MailboxProcessor.Start(fun inbox ->
            let workers = Array.init workerNum (fun _ -> MailboxProcessor.Start f)
            let rec loop currentNum = async {
                let! msg = inbox.Receive()
                workers.[currentNum].Post msg
                return! loop ((currentNum + 1) % workerNum)
            }
            loop 0 )
    /// Runs all posted Tasks
    let testRunner = 
        startParallelMailbox 10 (fun inbox ->
            let rec loop () = async {
                let! test = inbox.Receive()
                do! testRun test
                return! loop()
            }
            loop ())
    /// Start the given tests and print a sumary at the end
    let startTests tests = async {
        let! results =
            tests 
                |> Seq.map (fun t ->
                    let waiter = t.Finished |> Async.AwaitEvent
                    testRunner.Post t
                    waiter
                   )
                |> Async.Parallel
        let testTime = 
            results
                |> Seq.map (fun res -> res.Time)
                |> Seq.fold (fun state item -> state + item) TimeSpan.Zero
        let failed = 
            results
                |> Seq.map (fun res -> res.Failure) 
                |> Seq.filter (fun o -> o.IsSome)
                |> Seq.length
        let testCount = results.Length
        if failed > 0 then
            writerWrite ConsoleColor.DarkRed (sprintf "--- %d of %d TESTS FAILED (%A) ---" failed testCount testTime)
        else
            writerWrite ConsoleColor.DarkGray (sprintf "--- %d TESTS FINISHED SUCCESFULLY (%A) ---" testCount testTime)
        }

Now the Exception is only triggered when i use a specific set of tests which do some crawling on the web (some fail and some don't which is fine):

#r @"Yaaf.GameMediaManager.Primitives.dll";; // See below
open TestRunner

let testLink link =
    Yaaf.GameMediaManager.EslGrabber.getMatchMembers link
    |> Async.Ignore

let tests = [
    // Some working links (links that should work)
    yield! 
      [ //"TestMatch", "http://www.esl.eu/eu/wire/anti-cheat/css/anticheat_test/match/26077222/"
        "MatchwithCheater", "http://www.esl.eu/de/csgo/ui/versus/match/3035028"
        "DeletedAccount", "http://www.esl.eu/de/css/ui/versus/match/2852106" 
        "CS1.6", "http://www.esl.eu/de/cs/ui/versus/match/2997440" 
        "2on2Versus", "http://www.esl.eu/de/css/ui/versus/match/3012767" 
        "SC2cup1on1", "http://www.esl.eu/eu/sc2/go4sc2/cup230/match/26964055/"
        "CSGO2on2Cup", "http://www.esl.eu/de/csgo/cups/2on2/season_08/match/26854846/"
        "CSSAwpCup", "http://www.esl.eu/eu/css/cups/2on2/awp_cup_11/match/26811005/"
        ] |> Seq.map (fun (name, workingLink) -> simpleTest (sprintf "TestEslMatches_%s" name) (fun o -> testLink workingLink))
    ]

startTests tests |> Async.Start;; // this will produce the Exception now and then

https://github.com/matthid/Yaaf.GameMediaManager/blob/core/src/Yaaf.GameMediaManager.Primitives/EslGrabber.fs is the code and you can download https://github.com/downloads/matthid/Yaaf.GameMediaManager/GameMediaManager.%200.9.3.1.wireplugin (this is basically a renamed zip archive) and extract it to get the Yaaf.GameMediaManager.Primitives.dll binary (you can paste it into FSI instead of downloading when you want but then you have to reference the HtmlAgilityPack)

I can reproduce this with Microsoft (R) F# 2.0 Interactive, Build 4.0.40219.1. The Problem is that the Exception will not be triggered always (but very often) and the stacktrace is telling me nothing

System.Exception: multiple waiting reader continuations for mailbox
   bei <StartupCode$FSharp-Core>[email protected](AsyncParams`1 _arg11)
   bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
   bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   bei <StartupCode$FSharp-Core>.$Control.finishTask@1280[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 firstExn, T[] results, TrampolineHolder trampolineHolder, Int32 remaining)
   bei <StartupCode$FSharp-Core>.$Control.recordFailure@1302[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 count, FSharpRef`1 firstExn, T[] results, LinkedSubSource innerCTS, TrampolineHolder trampolineHolder, FSharpChoice`2 exn)
   bei <StartupCode$FSharp-Core>[email protected](Exception exn)
   bei Microsoft.FSharp.Control.AsyncBuilderImpl.protectedPrimitive@690.Invoke(AsyncParams`1 args)
   bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
   bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   bei <StartupCode$FSharp-Core>[email protected](Object state)
   bei System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   bei System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   bei System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   bei System.Threading.ThreadPoolWorkQueue.Dispatch()
   bei System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

Because this is will be triggered on a worker thread, which I have no control of, this will crash the application (not FSI but the exception will be displayed here too).

I found http://cs.hubfs.net/topic/Some/2/59152 and http://cs.hubfs.net/topic/None/59146 but I do not use StartChild and I don't think I'm invoking Receive from multiple Threads at the same time somehow?

Is there anything wrong with my Code or is this indeed a bug? How can I workaround this if possible?

I noticed that in FSI that all tests will run as expected when the Exception is silently ignored. How can I do the same?

EDIT: I noticed after I fixed the failing unit tests it will work properly. However I can stil not reproduce this with a smaller codebase. For example with my own failing tests.

Thanks, matthid


Solution

  • Sadly I never actually could reproduce this on a smaller code base, and now I would use NUnit with async test support instead of my own implementation. I used agents (MailboxProcessor) and asyncs in various projects since them and never encountered this again...