Search code examples
c#asynchronous.net-corevaluetaskivaluetasksource

What's the right way to implement ValueTaskSource.SetCompleted


So we have this class implementing IValueTaskSource This code cannot be written as async-await because there's nothing to await on. We send a message to another running thread and get back a ValueTask that can be awaited by the caller to get the notification that the other thread has finished processing the message. However the other thread is pre-existing and already doing work. It receives the message by a completely other way; processes the message, then needs to tell the threadpool-origin thread that it finished. Thus; IValueTaskSource

There is no stock ValueTaskSource (not getting into whether or not there should be; however in this case a stock version would be of questionable utility). What we actully have looks very much like this:

class Message : IValueTaskSource {
    public ValueTask Send()
    {
        /* how the message is sent is irrelevant */
        return new ValueTask(this, 0);
    }

    private Action<object> continuation;
    private object continuationState;

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short _, ValueTaskSourceOnCompletedFlags __)
    {
         lock(this) {
              if (GetStatus(_) == ValueTaskSourceStatus.Pending)
              {
                  this.continuation = continuation;
                  this.continuationState = state;
                  return;
              }
              continuation(continuationState); /* Suspect */
         }
    }

    public void SetCompleted()
    {
        lock (this)
        {
             /* set state completed omitted for brevity */
             continuation?.Invoke(continuationState); /* Suspect */             
        }
    }
}

I think I'm doing this wrong. Imagine a large chain of these; it seems like it would build up too much stack. In particular, the lines marked /* Suspect */ are exactly that; and ValueTaskSourceOnCompletionFlags is unused. Although it does have the nicety in that an exception thrown by continuation always goes somewhere; assuming that's even a real issue.

Right now, the code works because there are only three of them and the continuations that use them are very much thread agnostic which thread they are on.


Solution

  • Based on the link to ManualResetValueTaskSource provided by Stephen Cleary and the corresponding source code I was able to produce an answer.

    ManualResetValueTaskSourceCore<T> provides a complete implementation of IValueTaskSource<T> and IValueTaskSource<T>. This is currently a case of there's no void implementation so create a void implementation with a dummy type. There's some generalized debate on whether bool or object is the best dummy type but I think it doesn't really matter because member padding of T will force alignment anyway.

    So the answer is to forward all the methods.

        public ValueTask Send()
        {
            /* how the message is sent is irrelevant */
            return CraeteValueTask();
        }
    
        private ManualResetValueTaskSourceCore<object> taskSource;
    
        private ValueTask CreateValueTask() => new ValueTask(this, taskSource.Version);
        public ValueTaskSourceStatus GetStatus(short version) => taskSource.GetStatus(version);
        public void OnCompleted(Action<object> continuation, object state, short version, ValueTaskSourceOnCompletedFlags flags) => taskSource.OnCompleted(continuation, state, version, flags);
        public void SetCompleted() => taskSource.SetResult(null);
    

    In this case each message is in its own object so there's no pooling. Doesn't matter. Calling the existing implementation is so much easier than trying to write down the smallest correct implementation that it's still the better way.

    I'm pretty sure if I were pooling value task sources the correct way would be to call Reset() inside CreateValueTask().