Search code examples
c#.netsystem.reactivereactivexrx.net

In Rx.NET, how do I make a Subject to resemble TaskCompletionSource behavior?


In Rx.NET, how do I make a Subject to resemble TaskCompletionSource.Task behavior?

It needs to cache and reply the first event, even if completed. Neither AsyncSubject nor ReplaySubject(bufferSize: 1) would do that.

For example (let's call it PromiseSubject):

//var subj = new ReplaySubject<int>(bufferSize: 1);
var subj = new PromiseSubject<int>();

subj.Subscribe(i => Console.WriteLine(i));

subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
subj.OnCompleted();

subj.Subscribe(i => Console.WriteLine(i));

Console.ReadLine();

Expected output:

1
1

I can possibly cook it up using TaskCompletionSource, TaskObservableExtensions.ToObservable and a custom SubjectBase-derived subject implementation, but is there an elegant way of doing it using a composition of Rx operators?

Updated, my initial attempt via TaskCompletionSource:

public class PromiseSubject<T> : ISubject<T>
{
    private readonly TaskCompletionSource<(bool HasValue, T Value)> _tcs;
    private readonly IObservable<T> _observable;

    public PromiseSubject()
    {
        _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
        _observable = _tcs.Task.ToObservable()
            .Where(r => r.HasValue).Select(r => r.Value!);
    }

    public void OnCompleted() =>
        _tcs.TrySetResult((false, default!));

    public void OnError(Exception error) =>
        _tcs.TrySetException(error);

    public void OnNext(T value) =>
        _tcs.TrySetResult((true, value));

    public IDisposable Subscribe(IObserver<T> observer) =>
        _observable.Subscribe(observer);
}

Solution

  • This is a simplified version of Jeff Mercado's answer. I think that the desirable behavior can be achieved simply by completing a ReplaySubject(bufferSize: 1) after the first OnNext.

    Actually an AsyncSubject<T>, as pointed out by @noseratio in a comment, is even simpler, and also slightly more efficient because it stores its single value in a field instead of an array.

    public class WriteOnceSubject<T> : ISubject<T>
    {
        private readonly AsyncSubject<T> subject = new();
    
        public void OnNext(T value) { subject.OnNext(value); subject.OnCompleted(); }
        public void OnError(Exception error) => subject.OnError(error);
        public void OnCompleted() => subject.OnCompleted();
    
        public IDisposable Subscribe(IObserver<T> observer) => subject.Subscribe(observer);
    }
    

    So in this sequence of events:

    writeOnceSubject.OnNext(1);
    writeOnceSubject.OnNext(2);
    writeOnceSubject.OnNext(3);
    writeOnceSubject.OnCompleted();
    writeOnceSubject.OnError(new Exception());
    

    ...all commands except the first will be no-ops. When the writeOnceSubject is subscribed later, it will emit the value 1 that is stored in its buffer, followed by an OnCompleted notification.