I want to process a series of jobs in sequence, but I want to queue up those jobs in parallel.
Here is my code:
open System.Threading.Tasks
let performWork (work : int) =
task {
do! Task.Delay 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
do!
performWork work
|> Async.AwaitTask
ch.Reply()
})
w.Error.Add(fun exn -> raise exn)
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! w.PostAndAsyncReply(fun ch -> ch, i)
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
I expect this code to crash once it reaches work item 7
.
However, it hangs forever:
$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6
I think that the w.Error
event is not firing correctly.
How should I be capturing and re-throwing this error?
If my work is async
, then it crashes as expected:
let performWork (work : int) =
async {
do! Async.Sleep 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
But I don't see why this should matter.
Leveraging a Result
also works, but again, I don't know why this should be required.
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
try
do!
performWork work
|> Async.AwaitTask
ch.Reply(Ok ())
with exn ->
ch.Reply(Error exn)
})
let performWorkOnWorker (work : int) =
async {
let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)
match outcome with
| Ok () ->
return ()
| Error exn ->
return raise exn
}
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! performWorkOnWorker i
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
I think the gist of the 'why' here is that Microsoft changed the behaviour for 'unobserved' task exceptions back in .NET 4.5, and this was brought through into .NET Core: these exceptions no longer cause the process to terminate, they're effectively ignored. You can read more about it here.
I don't know the ins and outs of how Task
and async
are interoperating, but it would seem that the use of Task
results in the continuations being attached to that and run on the TaskScheduler
as a consequence. The exception is thrown as part of the async
computation within the MailboxProcessor
, and nothing is 'observing' it. This means the exception ends up in the mechanism referred to above, and that's why your process no longer crashes.
You can change this behaviour via a flag on .NET Framework via app.config
, as explained in the link above. For .NET Core, you can't do this. You'd ordinarily try and replicate this by subscribing to the UnobservedTaskException
event and re-throwing there, but that won't work in this case as the Task
is hung and won't ever be garbage collected.
To try and prove the point, I've amended your example to include a timeout for PostAndReplyAsync
. This means that the Task
will eventually complete, can be garbage collected and, when the finaliser runs, the event fired.
open System
open System.Threading.Tasks
let performWork (work : int) =
task {
do! Task.Delay 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
let worker = async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
do!
performWork work
|> Async.AwaitTask
ch.Reply()
})
w.Error.Add(fun exn -> raise exn)
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! w.PostAndAsyncReply((fun ch -> ch, i), 10000)
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
TaskScheduler.UnobservedTaskException.Add(fun ex ->
printfn "UnobservedTaskException was fired, re-raising"
raise ex.Exception)
try
Async.RunSynchronously worker
with
| :? TimeoutException -> ()
GC.Collect()
GC.WaitForPendingFinalizers()
The output I get here is:
Work 1
Work 3
Work 4
Work 2
Work 5
Work 6
UnobservedTaskException was fired, re-raising
Unhandled exception. System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. (One or more errors occurred. (Oh no))
---> System.AggregateException: One or more errors occurred. (Oh no)
---> System.Exception: Oh no
at [email protected]() in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 9
--- End of inner exception stack trace ---
at [email protected](ExceptionDispatchInfo edi)
at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 104
at Microsoft.FSharp.Control.AsyncPrimitives.AttachContinuationToTask@1144.Invoke(Task`1 completedTask) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1145
at System.Threading.Tasks.ContinuationTaskFromResultTask`1.InnerInvoke()
at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
--- End of inner exception stack trace ---
at [email protected](UnobservedTaskExceptionEventArgs ex) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 48
at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1989.System.IObserver<'T>.OnNext(T args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1990
at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.h@379.Invoke(Object _arg1, TArgs args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 379
at [email protected](Object delegateArg0, UnobservedTaskExceptionEventArgs delegateArg1) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 46
at System.Threading.Tasks.TaskScheduler.PublishUnobservedTaskException(Object sender, UnobservedTaskExceptionEventArgs ueea)
at System.Threading.Tasks.TaskExceptionHolder.Finalize()
As you can see, the exception is eventually published by the Task
finaliser, and re-throwing it in that handler brings down the app.
While interesting, I'm not sure any of this is practically useful information. The suggestion to terminate the app within MailboxProcessor.Error
handler is probably the right one.