I'm trying to run some MailboxProcessor testing, and seems like mailbox Scan() fails with "System.Exception: multiple waiting reader continuations for mailbox". This happens with Async.Start and Async.StartImmediate, etc. (Async.RunSynchronously also won't work as then there is only one processor and no customers after initial customers).
Here is demo code, this works in interactive:
#if INTERACTIVE
#r "../packages/FSharp.Data.2.0.4/lib/net40/FSharp.Data.dll"
#endif
open System
open FSharp.Data
let random = new Random()
let data = FreebaseData.GetDataContext()
let customerNames = data.Commons.Computers.``Computer Scientists``
let nameAmount = customerNames |> Seq.length
// ----
type Customer() =
let person = customerNames |> Seq.nth (random.Next nameAmount)
member x.Id = Guid.NewGuid()
member x.Name = person.Name
member x.RequiredTime = random.Next(10000)
type Barber(name) =
member x.Name = name
type ``Possible actions notified to barber`` =
| CustomerWalksIn of Customer
let availableCustomers = new MailboxProcessor<``Possible actions notified to barber``>(fun c -> async { () })
let createBarber name =
Console.WriteLine("Barber " + name + " takes inital nap...")
let rec cutSomeHairs () =
async{
do! availableCustomers.Scan(function
| CustomerWalksIn customer ->
async {
Console.WriteLine("Barber " + name + " is awake and started cutting " + customer.Name + "'s hair.")
// exception also happen with Threading.Thread.Sleep()
do! Async.Sleep customer.RequiredTime
Console.WriteLine("Barber " + name + " finnished cutting " + customer.Name + "'s hair. Going to sleep now...")
} |> Some)
do! cutSomeHairs ()
}
cutSomeHairs() |> Async.StartImmediate
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
createBarber "Tuomas";
createBarber "Seppo";
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
...and the stacktrace I get after running a while is:
System.Exception: multiple waiting reader continuations for mailbox
at <StartupCode$FSharp-Core>[email protected](AsyncParams`1 _arg1)
at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
at <StartupCode$FSharp-Core>[email protected](Object state)
at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.TimerQueueTimer.CallCallback()
at System.Threading.TimerQueueTimer.Fire()
at System.Threading.TimerQueue.FireNextTimers()
at System.Threading.TimerQueue.AppDomainTimerCallback()
Stopped due to error
or same without threads:
System.Exception: multiple waiting reader continuations for mailbox
at <StartupCode$FSharp-Core>[email protected](AsyncParams`1 _arg1)
at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
at <StartupCode$FSharp-Core>[email protected](Object state)
Stopped due to error
The Receive
and Scan
methods of an MailboxProcessor
should be only called from the body of the agent. To quote the MSDN documentation:
This method is for use within the body of the agent. For each agent, at most one concurrent reader may be active, so no more than one concurrent call to Receive, TryReceive, Scan or TryScan may be active. The body of the scanner function is locked during its execution, but the lock is released before the execution of the asynchronous workflow.
So, you'll need to structure your code differently. I do not have a detailed answer, but it sounds like my article on implementing blocking queue using agents could help here.