Search code examples
c#async-awaittasktaskcompletionsource

Using a TaskCompletionSource<T> to asynchronously wait without locking?


I have a class that processes a never ending stream of data. I also have external components that would like to asynchronously sample the processed data from time to time in an ad-hoc fashion.

Since it could be a while before the "next item" is processed, I'd like the data inspector components to "await" the item so they don't block. They could be any number of different data inspecting components, and if multiple inspectors are awaiting an item, it's fine if they all get the same item instance.

The data items are very large, so they get disposed immediately after processing for memory efficiency. And I don't want to do anything that will delay this (like posting the data to a channel or something like that)

I have the implementation below using a TaskCompletionSource and locks to make sure that the tcs works on different threads. It seems to work okay, but I'd like to do this in a lock-less way if possible.

public class DataProcessor
{
  private object _sampleDataLock = new object();
  private TaskCompletionSource<Data> _sampleData;
  private Thread _processThread;

  public DataProcessor()
  {
     _processThread = new(ProcessDataLoop);
     _processThread.Start();
  }

  void ProcessDataLoop()
  {
      while (true)
      {
         var processedData = ProcessNextData();
         
         //This code in the lock is what I wonder about
         lock (_sampleDataLock)
         {
            _sampleData?.SetResult(processedData.Clone());
            _sampleData = null;
         }

        processedData.Dispose();
      }
  }

  public Task<Data> SampleData(TimeSpan timeout)
  {
    // This is the code I'm wondering how to write without a lock
    lock (_sampleDataLock)
    {
      _sampleData ??= new TaskCompletionSource<Data>();
      return _sampleData.Task.WithTimeout(timeout);
    }
  }
}

public class DataInspector
{
    private DataProcessor _processor;

    public async Task InspectData()
    {
       //wait up to 30s for an item
       var data = await _processor.SampleData(TimeSpan.FromSeconds(30));
       // look at data
    }
}

I'm wondering if there's a way to do the same thing without locking.


Solution

  • You could use a traditional event handler. Readers would subscribe to the event, clone the data needed, then unsubscribe before returning. The TaskCompletionSource is handled only by the SampleData method (passed to the handler by closure) and not touched by the main execution loop.

    public class DataProcessor
    {
        public class DataEventArgs : EventArgs
        {
            public Data Data { get; set; }
        }
    
        public event EventHandler<DataEventArgs> DataReady;
    
        private Thread _processThread;
        
        public DataProcessor()
        {
            _processThread = new(ProcessDataLoop);
            _processThread.Start();
        }
        
        
        void OnDataReady(Data data) => DataReady?.Invoke(this, new DataEventArgs { Data = data });
        
        void ProcessDataLoop()
        {
            while (true)
            {
                var processedData = ProcessNextData();
                this.OnDataReady(processedData);
                processedData.Dispose();
            }
        }
    
        public async Task<Data> SampleData(TimeSpan timeout)
        {
            var tcs = new TaskCompletionSource<Data>();
            void Handler(object sender, DataEventArgs e) 
            {
                tcs.SetResult(e.Data.Clone());
                this.DataReady -= Handler;
            }
          
            this.DataReady += Handler;
            return await tcs.Task;
        }
    }