Search code examples
f#task-parallel-librarymailboxprocessor

Interaction with MailboxProcessor and Task hangs forever


I want to process a series of jobs in sequence, but I want to queue up those jobs in parallel.

Here is my code:

open System.Threading.Tasks

let performWork (work : int) =
  task {
    do! Task.Delay 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn $"Work {work}"
  }

async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      do!
        performWork work
        |> Async.AwaitTask

      ch.Reply()
  })

  w.Error.Add(fun exn -> raise exn)

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! w.PostAndAsyncReply(fun ch -> ch, i)

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously

I expect this code to crash once it reaches work item 7.

However, it hangs forever:

$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6

I think that the w.Error event is not firing correctly.

How should I be capturing and re-throwing this error?

If my work is async, then it crashes as expected:

let performWork (work : int) =
  async {
    do! Async.Sleep 1000

    if work = 7 then
      failwith "Oh no"
    else
      printfn $"Work {work}"
  }

But I don't see why this should matter.


Leveraging a Result also works, but again, I don't know why this should be required.

async {
  let w = MailboxProcessor.Start (fun inbox -> async {
    while true do
      let! message = inbox.Receive()

      let (ch : AsyncReplyChannel<_>), work = message

      try
        do!
          performWork work
          |> Async.AwaitTask

        ch.Reply(Ok ())
      with exn ->
        ch.Reply(Error exn)
  })

  let performWorkOnWorker (work : int) =
    async {
      let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)

      match outcome with
      | Ok () ->
        return ()
      | Error exn ->
        return raise exn
    }

  let! completed =
    seq {
      for i = 1 to 10 do
        async {
          do! Async.Sleep 100
          do! performWorkOnWorker i

          return i
        }
    }
    |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)

  printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously

Solution

  • I think the gist of the 'why' here is that Microsoft changed the behaviour for 'unobserved' task exceptions back in .NET 4.5, and this was brought through into .NET Core: these exceptions no longer cause the process to terminate, they're effectively ignored. You can read more about it here.

    I don't know the ins and outs of how Task and async are interoperating, but it would seem that the use of Task results in the continuations being attached to that and run on the TaskScheduler as a consequence. The exception is thrown as part of the async computation within the MailboxProcessor, and nothing is 'observing' it. This means the exception ends up in the mechanism referred to above, and that's why your process no longer crashes.

    You can change this behaviour via a flag on .NET Framework via app.config, as explained in the link above. For .NET Core, you can't do this. You'd ordinarily try and replicate this by subscribing to the UnobservedTaskException event and re-throwing there, but that won't work in this case as the Task is hung and won't ever be garbage collected.

    To try and prove the point, I've amended your example to include a timeout for PostAndReplyAsync. This means that the Task will eventually complete, can be garbage collected and, when the finaliser runs, the event fired.

    open System
    open System.Threading.Tasks
    
    let performWork (work : int) =
      task {
        do! Task.Delay 1000
    
        if work = 7 then
          failwith "Oh no"
        else
          printfn $"Work {work}"
      }
    
    let worker = async {
      let w = MailboxProcessor.Start (fun inbox -> async {
        while true do
          let! message = inbox.Receive()
    
          let (ch : AsyncReplyChannel<_>), work = message
    
          do!
            performWork work
            |> Async.AwaitTask
    
          ch.Reply()
      })
    
      w.Error.Add(fun exn -> raise exn)
    
      let! completed =
        seq {
          for i = 1 to 10 do
            async {
              do! Async.Sleep 100
              do! w.PostAndAsyncReply((fun ch -> ch, i), 10000)
    
              return i
            }
        }
        |> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
    
      printfn $"Completed {Seq.length completed} job(s)."
    
    }
    
    TaskScheduler.UnobservedTaskException.Add(fun ex ->
        printfn "UnobservedTaskException was fired, re-raising"
        raise ex.Exception)
    
    try
      Async.RunSynchronously worker
    with
      | :? TimeoutException -> ()
    
    GC.Collect()
    GC.WaitForPendingFinalizers()
    

    The output I get here is:

    Work 1
    Work 3
    Work 4
    Work 2
    Work 5
    Work 6
    UnobservedTaskException was fired, re-raising
    Unhandled exception. System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. (One or more errors occurred. (Oh no))
     ---> System.AggregateException: One or more errors occurred. (Oh no)
     ---> System.Exception: Oh no
       at [email protected]() in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 9
       --- End of inner exception stack trace ---
       at [email protected](ExceptionDispatchInfo edi)
       at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 104
       at Microsoft.FSharp.Control.AsyncPrimitives.AttachContinuationToTask@1144.Invoke(Task`1 completedTask) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1145
       at System.Threading.Tasks.ContinuationTaskFromResultTask`1.InnerInvoke()
       at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
       at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
    --- End of stack trace from previous location ---
       at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
       --- End of inner exception stack trace ---
       at [email protected](UnobservedTaskExceptionEventArgs ex) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 48
       at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1989.System.IObserver<'T>.OnNext(T args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1990
       at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.h@379.Invoke(Object _arg1, TArgs args) in D:\a\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 379
       at [email protected](Object delegateArg0, UnobservedTaskExceptionEventArgs delegateArg1) in /Users/cmager/dev/ConsoleApp1/ConsoleApp2/Program.fs:line 46
       at System.Threading.Tasks.TaskScheduler.PublishUnobservedTaskException(Object sender, UnobservedTaskExceptionEventArgs ueea)
       at System.Threading.Tasks.TaskExceptionHolder.Finalize()
    

    As you can see, the exception is eventually published by the Task finaliser, and re-throwing it in that handler brings down the app.

    While interesting, I'm not sure any of this is practically useful information. The suggestion to terminate the app within MailboxProcessor.Error handler is probably the right one.