Search code examples
c#asynchronoustasktaskcompletionsource

Synchronously Wait on TaskCompletionSource.Task


Sync-over-async is bad. I know. But is there a sync-over-async problem when calling TaskCompletionSource.Task.Wait()? Does the answer changes if TaskCompletionSource was created with TaskCreationOptions.RunContinuationsAsynchronously?

Update
To answer the questions from the comments. Not all Task's are equal. The Task object was introduced long before async/await and was used for parallel programming. For example, there is nothing wrong with the following code as it doesn't do any async work.

var task = Task.Run(() => Thread.Sleep(10_000));
task.Wait();

For the context: The Kafka client has a sync method to produce messages which accepts an action to report delivery status asynchronously

void Produce(
      TopicPartition topicPartition,
      Message<TKey, TValue> message,
      Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);

In a few scenarios, I need to wait for the delivery report before continuing the work, which can be in sync or async context. For that, I have the following class:

internal class DeliveryReportAwaiter<TKey, TValue> : IDisposable
{
    private const int WaitForDeliveryGracePeriodFactor = 2;
    private readonly int _waitDeliveryReportTimeoutMs;
    private readonly ILogger _logger;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly TaskCompletionSource _taskCompletionSource;

    private bool _disposed;

    public DeliveryReportAwaiter(int waitDeliveryReportTimeoutMs, ILogger logger)
    {
        _logger = logger;
        _waitDeliveryReportTimeoutMs = waitDeliveryReportTimeoutMs *
            WaitForDeliveryGracePeriodFactor;

        _taskCompletionSource = new TaskCompletionSource(
            TaskCreationOptions.RunContinuationsAsynchronously);
        _cancellationTokenSource = new CancellationTokenSource();
        // in case OnDeliveryReportReceived was never called
        _cancellationTokenSource.Token.Register(SetTaskTimeoutException);
    }

    public void WaitForDeliveryReport(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        _cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
        
        // Is this considered sync-over-async?
        _taskCompletionSource.Task.Wait(token);
    }

    public Task WaitForDeliveryReportAsync(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        _cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
        return _taskCompletionSource.Task.WaitAsync(token);
    }

    public void OnDeliveryReportReceived(DeliveryReport<TKey, TValue> deliveryReport,
        Action<DeliveryReport<TKey, TValue>> handleReportAction)
    {
        if (_disposed)
        {
            _logger.LogWarning(
                "The delivery report for the message {Key} on topic {Topic} arrived " +
                    "after the awaiter was disposed due to timeout or cancellation. " +
                    "The delivery status is {Status}",
                deliveryReport.Key,
                deliveryReport.Topic,
                deliveryReport.Status);

            return;
        }

        if (!_cancellationTokenSource.TryReset())
        {
            SetTaskTimeoutException();
        }
        else
        {
            handleReportAction?.Invoke(deliveryReport);
            _taskCompletionSource.TrySetResult();
        }
    }

    public void Dispose()
    {
        if (_disposed)
        {
            return;
        }

        _disposed = true;
        _cancellationTokenSource.Dispose();
    }

    private void SetTaskTimeoutException()
    {
        var errorMessage = $"Producer timed out while waiting for publish " +
            $"confirm for {_waitDeliveryReportTimeoutMs}ms!";
        _taskCompletionSource.TrySetException(new KafkaTimeoutException(errorMessage));
    }
}

See the WaitForDeliveryReport method implementation.
Now the question is much longer but I hope it will help people to understand the reason behind it.


Solution

  • Initializing the TaskCompletionSource with the TaskCreationOptions.RunContinuationsAsynchronously option means that the Task will be completed on the ThreadPool, and not on the thread that called the TrySetException method. This is useful in case that the Task is awaited, so that the completion thread is not hijacked by the continuation(s) after the await. Having your thread "stolen" after calling SetResult is a common cause of bugs in asynchronous programming. It is something highly unexpected by most programmers, before they experience it for the first time (usually at the finale a long and unpleasant debugging session).

    But in case that the Task is Waited synchronously, completing it on the ThreadPool offers no advantage. The continuation after the Wait will run on the waiting thread, and not on the completing thread. All that the completing thread will have to do is to signal an internal ManualResetEventSlim, which is a negligible amount of work.

    So you should avoid initializing the TaskCompletionSource with this option, if you intend to Wait it synchronously. Doing so not only adds overhead for no reason, but also risks the delayed completion of the task, in case the ThreadPool is saturated. In extreme cases it's even possible for a complete deadlock to occur, in case the ThreadPool is saturated and has reached its maximum size.