Search code examples
c#system.reactiverx.net

Ways to replace Rx.NET Subject which representing state


I currently working on fixing bug in the following method which polls stateChecker condition while it null until it becomes true (or false due to timeout):

private static void WaitWithSubject(
   Func<bool> stateChecker,
   TimeSpan timeout,
   TimeSpan stepTime, 
   string errorMessage,
   ILifetimeInfo lifetimeInfo)
{
   (bool? IsOk, string Message) state = (IsOk: null, Message: string.Empty);
   var waitCancellation = (int)stepTime.TotalMilliseconds;
   using (var stateSubject = new Subject<(bool? IsOk, string Message)>())
   {
      using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false, Message: errorMessage))))
      using (Observable.Timer(TimeSpan.Zero, stepTime).
         Subscribe(it =>
         {
            if (stateChecker())
               stateSubject.OnNext((IsOk: true, Message: string.Empty));
         }))
      {
         using (stateSubject.Subscribe(it => state = it))
         {
            while (state.IsOk == null)
               lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
            if (state.IsOk != true)
               throw new TimeoutException(state.Message);
            stateSubject.OnCompleted();
         }
      }
   }
}

This method occasionally generates ObjectDisposedException at following point in code on executing method OnNext :

if ( stateChecker() )
    stateSubject.OnNext( ( IsOk: true, Message: string.Empty ) );

Is there a way of totally avoid using Subject in this case in favor of something like Observable.Interval or Observable.Create?


Solution

  • It seems to me that this is what you're trying to do:

    private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo) =>
        Observable
            .Amb(
                Observable
                    .Timer(timeout)
                    .SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),
                Observable
                    .Timer(TimeSpan.Zero, stepTime)
                    .Where(_ => stateChecker())
                    .Select(_ => Unit.Default))
            .Take(1)
            .Wait();
    

    The key here is the Amb operator which starts the two sequences and only returns values from the first one to produce a value or an error. The Take(1) ensures that the observable finishes as soon as a value is produced.

    You can throw the following line in just before the Wait() to cancel if you have a CancellationToken:

    .TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))
    

    After a bit of back and forth with Theodor, I've come up with this version that I think is possibility the cleanest I can think of:

    private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo)
    {
        var good =
            Observable
                .Timer(TimeSpan.Zero, stepTime)
                .Where(_ => stateChecker())
                .Take(1);
    
        var fail =
            Observable
                .Timer(timeout)
                .SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
        
        good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
    }