Search code examples
c#multithreadingasync-awaittpl-dataflow

Async thread dies on TPL await


I'm writing a simple producer/consumer application, but I'm noticing a really strange behaviour..This is the code:

private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);

static void Main(string[] args)
{  
  // Start consumer thread
  Consume();
  
  // Produce
  var t = new Thread(() =>
  {
    while (true)
    {
      var packet = RtpPacket.GetNext();
      _queue.Post(packet);
      Thread.Sleep(70);
    }
  }
  t.Join();
}

static void Consume()
{
  _timelineThread = new Thread(async () =>
  {
    while (_stopping.WaitOne(0) == false)
    {
      // Start consuming...
      while (await _queue.OutputAvailableAsync())
      {
        var packet = await _queue.ReceiveAsync();
        // Some processing...
      }
    }
  });
  _timelineThread.Start();   
}

This is intended to be an infinite loop (until I route the _stopping signal). But, when _timelineThread hits the first await _queue.OutputAvailableAsync(), the thread changes state to 'Stopped'. There is something wrong that I'm not considering ?

If I change the Consume() function to this:

static void Consume()
{
  _timelineThread = new Thread(() =>
  {
    while (_stopping.WaitOne(0) == false)
    {
      // Start consuming...
      while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
      {
        var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
        // Some processing...
      }
    }
  });
  _timelineThread.Start();   
}

the thread runs without any problem..but the code is almost identical to the previous one..

EDIT: after one hour also this 'hack' doesn't seems to work..thread is 'Running' but I don't receive any data from the queue..


Solution

  • The Thread constructor does not understand async delegates. You can read about this here:

    My suggestion is to use a synchronous BlockingCollection<RtpPacket> instead of the BufferBlock<RtpPacket>, and consume it by enumerating the GetConsumingEnumerable method:

    var _queue = new BlockingCollection<RtpPacket>();
    
    var producer = new Thread(() =>
    {
        while (true)
        {
            var packet = RtpPacket.GetNext();
            if (packet == null) { _queue.CompleteAdding(); break; }
            _queue.Add(packet);
            Thread.Sleep(70);
        }
    });
    
    var consumer = new Thread(() =>
    {
        foreach (var packet in _queue.GetConsumingEnumerable())
        {
            // Some processing...
        }
    });
    
    producer.Start();
    consumer.Start();
    
    producer.Join();
    consumer.Join();