Search code examples
c#.netasync-awaittask-parallel-librarytpl-dataflow

BufferBlock.ReceiveAsync(timeout) hang but BufferBlock.ReceiveAsync() works OK


Either I am doing something really wrong, but the below never returns it hangs forever on the ReceiveAsync despite specifying a 1 second timeout.

I would expect it to return null value, after the time out.

/* snipped MyContainer class */

private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();

public async Task ExecuteAsync(CancellationToken stoppingToken)
{
     // makes no difference if creating with TaskCreationOptions.LongRunning
     await Task
            .Factory
            .StartNew(async () =>
            {

              while (stoppingToken.IsCancellationRequested == false)
              {                     
                 // we get here OK, but no further if i use TimeSpan for delay
                 // without Timespan i.e. ReceiveAsync() only, it does **not** hang
                 var item = await 
                             this
                               .queue
                               .ReceiveAsync(TimeSpan.FromMilliseconds(1000));

                      // process it, but we never get here we sleep forever
                      await ProcessAsync(item);
                    }
             } /*,TaskCreationOptions.LongRunning*/);

     // we get here and print the below OK
     Console.WriteLine("thread created and running");
}

// this is called by the original (or some other) thread
// either if i call this or not, the above thread code still locks on ReceiveAsync
public void Add(byte[] data)
{
   Console.WriteLine("adding");
   this.queue.Post(data);
   Console.WriteLine("done"); // gets posted OK     
}

Important update - works OK if I do not specify a delay

var item = await this.queue.ReceiveAsync());

The code works OK if I remove the delay, however I do some background housekeeping every second (for packet counters etc) so this is important to wake up if nothing received within 1 second.

Other notes:

I am calling the above code from a generic dot net worker host:

public class Worker : BackgroundService
{
    private readonly MyContainer containerClass;
    private readonly ILogger<Worker> logger;

    public Worker(MyContainer containerClass, ILogger<Worker> logger)
    {
        this.containerClass = containerClass;
        this.logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        this.containerClass.ExecuteAsync(stoppingToken);

        while (!stoppingToken.IsCancellationRequested)
        {                
            this.logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
            await Task.Delay(1000, stoppingToken);
        }
    }
}

The above is called after the worker is built by IHostBuilder and I called Host.Run().

My understanding is (which I clearly need to work on!) since I create the thread, it should run totally independently from (and not block on) the thread that created/called it... in other words it should be able to call ReceiveAsync within the thread itself without getting blocked.


Solution

  • Using the Task.Factory.StartNew with an async delegate creates a nested task:

    Task<Task> nestedTask = Task.Factory.StartNew(async () => { //...
    

    You are awaiting the outer task, but not the inner, so the inner task becomes a fire-and-forget task. It is possible to await both tasks in one line by using the await operator twice:

    await await Task.Factory.StartNew(async () => { //...
    

    Alternatively you can combine the two tasks in one by using the Unwrap method.

    await Task.Factory.StartNew(async () => { /* ... */ }).Unwrap();
    

    ...or even better use the Task.Run method instead of the Task.Factory.StartNew, because the former understands async delegates, and does the unwrapping for you:

    await Task.Run(async () => { //...
    

    If you are interested about the differences between Task.Factory.StartNew and Task.Run, you could read an informative article here.