In Rx.NET, how do I make a Subject
to resemble TaskCompletionSource.Task
behavior?
It needs to cache and reply the first event, even if completed. Neither AsyncSubject
nor ReplaySubject(bufferSize: 1)
would do that.
For example (let's call it PromiseSubject
):
//var subj = new ReplaySubject<int>(bufferSize: 1);
var subj = new PromiseSubject<int>();
subj.Subscribe(i => Console.WriteLine(i));
subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
subj.OnCompleted();
subj.Subscribe(i => Console.WriteLine(i));
Console.ReadLine();
Expected output:
1
1
I can possibly cook it up using TaskCompletionSource
, TaskObservableExtensions.ToObservable
and a custom SubjectBase
-derived subject implementation, but is there an elegant way of doing it using a composition of Rx operators?
Updated, my initial attempt via TaskCompletionSource
:
public class PromiseSubject<T> : ISubject<T>
{
private readonly TaskCompletionSource<(bool HasValue, T Value)> _tcs;
private readonly IObservable<T> _observable;
public PromiseSubject()
{
_tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_observable = _tcs.Task.ToObservable()
.Where(r => r.HasValue).Select(r => r.Value!);
}
public void OnCompleted() =>
_tcs.TrySetResult((false, default!));
public void OnError(Exception error) =>
_tcs.TrySetException(error);
public void OnNext(T value) =>
_tcs.TrySetResult((true, value));
public IDisposable Subscribe(IObserver<T> observer) =>
_observable.Subscribe(observer);
}
This is a simplified version of Jeff Mercado's answer. I think that the desirable behavior can be achieved simply by completing a ReplaySubject(bufferSize: 1)
after the first OnNext
.
Actually an AsyncSubject<T>
, as pointed out by @noseratio in a comment, is even simpler, and also slightly more efficient because it stores its single value in a field instead of an array.
public class WriteOnceSubject<T> : ISubject<T>
{
private readonly AsyncSubject<T> subject = new();
public void OnNext(T value) { subject.OnNext(value); subject.OnCompleted(); }
public void OnError(Exception error) => subject.OnError(error);
public void OnCompleted() => subject.OnCompleted();
public IDisposable Subscribe(IObserver<T> observer) => subject.Subscribe(observer);
}
So in this sequence of events:
writeOnceSubject.OnNext(1);
writeOnceSubject.OnNext(2);
writeOnceSubject.OnNext(3);
writeOnceSubject.OnCompleted();
writeOnceSubject.OnError(new Exception());
...all commands except the first will be no-ops. When the writeOnceSubject
is subscribed later, it will emit the value 1
that is stored in its buffer, followed by an OnCompleted
notification.