Search code examples
c#.netmultithreadingasync-awaitblocking

Immediately cancelling blocking operation with timeout


I have a blocking operation that reads from a queue, but it can take a timeout. I can easily convert this to an "async" operation:

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        return await Task.Run(() =>
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                // Try receiving for one second
                IMessage message = consumer.Receive(TimeSpan.FromSeconds(1));

                if (message != null)
                {
                    return message;
                }
            }
        }, cancellationToken).ConfigureAwait(false);
    }

Aborting a thread is generally considered bad practice since you can leak resources, so the timeout seems like the only way to cleanly stop a thread. So I have three questions:

  1. What is a generally accepted timeout value for "immediate" cancellation?
  2. For libraries that provide built-in async methods, does immediate cancellation truly exist or do they also use timeouts and loops to simulate it? Maybe the question here is how would you make use of software interrupts and if these also have to do some sort of polling to check if there are interrupts, even if it's at the kernel/CPU level.
  3. Is there some alternate way I should be approaching this?

Edit: So I may have found part of my answer with Thread.Interrupt() and then handling ThreadInterruptedException. Is this basically a kernel-level software interrupt and as close to "immediate" as we can get? Would the following be a better way of handling this?

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        var completionSource = new TaskCompletionSource<IMessage>();

        var receiverThread = new Thread(() =>
        {
            try
            {
                completionSource.SetResult(consumer.Receive());
            }
            catch (ThreadInterruptedException)
            {
                completionSource.SetCanceled();
            }
            catch (Exception ex)
            {
                completionSource.SetException(ex);
            }
        });

        cancellationToken.Register(receiverThread.Interrupt);

        receiverThread.Name = "Queue Receive";
        receiverThread.Start();

        return await completionSource.Task.ConfigureAwait(false);
    }

Solution

    1. It depends on your specific needs. A second could be immediate for some and slow for others.
    2. Libraries (good ones) which provide async API do so from the bottom up. They usually don't wrap blocking (synchronous) operations with a thread to make them seem asynchronous. They use TaskCompletionSource to create truly async methods.
    3. I'm not sure what you mean by queue (the built-in Queue in .Net doesn't have a Receive method) but you should probably be using a truly async data structure like TPL Dataflow's BufferBlock.

    About your specific code sample. You are holding up a thread throughout the entire operation (that's async over sync) which is costly. You could instead try to consume quickly and then wait asynchronously for the timeout to end, or for the CancellationToken to be cancelled. There's also no point in using another thread with Task.Run. You can simply have the async lambda be the content of ReceiveAsync:

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();
            // Try receiving for one second
            IMessage message;
            if (!consumer.TryReceive(out message))
            {
                 await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
            }
            if (message != null)
            {
                return message;
            }
        }
    }
    

    If your queue implements IDisposable a different (harsher) option would be to call Dispose on it when the CancellationToken is cancelled. Here's how.