I currently working on fixing bug in the following method which polls stateChecker
condition while it null
until it becomes true
(or false
due to timeout):
private static void WaitWithSubject(
Func<bool> stateChecker,
TimeSpan timeout,
TimeSpan stepTime,
string errorMessage,
ILifetimeInfo lifetimeInfo)
{
(bool? IsOk, string Message) state = (IsOk: null, Message: string.Empty);
var waitCancellation = (int)stepTime.TotalMilliseconds;
using (var stateSubject = new Subject<(bool? IsOk, string Message)>())
{
using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false, Message: errorMessage))))
using (Observable.Timer(TimeSpan.Zero, stepTime).
Subscribe(it =>
{
if (stateChecker())
stateSubject.OnNext((IsOk: true, Message: string.Empty));
}))
{
using (stateSubject.Subscribe(it => state = it))
{
while (state.IsOk == null)
lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
if (state.IsOk != true)
throw new TimeoutException(state.Message);
stateSubject.OnCompleted();
}
}
}
}
This method occasionally generates ObjectDisposedException
at following point in code on executing method OnNext
:
if ( stateChecker() )
stateSubject.OnNext( ( IsOk: true, Message: string.Empty ) );
Is there a way of totally avoid using Subject in this case in favor of something like Observable.Interval
or Observable.Create
?
It seems to me that this is what you're trying to do:
private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo) =>
Observable
.Amb(
Observable
.Timer(timeout)
.SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),
Observable
.Timer(TimeSpan.Zero, stepTime)
.Where(_ => stateChecker())
.Select(_ => Unit.Default))
.Take(1)
.Wait();
The key here is the Amb
operator which starts the two sequences and only returns values from the first one to produce a value or an error. The Take(1)
ensures that the observable finishes as soon as a value is produced.
You can throw the following line in just before the Wait()
to cancel if you have a CancellationToken
:
.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))
After a bit of back and forth with Theodor, I've come up with this version that I think is possibility the cleanest I can think of:
private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo)
{
var good =
Observable
.Timer(TimeSpan.Zero, stepTime)
.Where(_ => stateChecker())
.Take(1);
var fail =
Observable
.Timer(timeout)
.SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
}