Search code examples
.netexceptionsystem.reactiveobserver-pattern

Continue using subscription after exception


I was trying out a "type-to-search" Reactive-Extensions sample which takes a string from a textbox (WPF app if it matters) and does a potentially-lengthy server-side search (simulated in my case with a Thread.Sleep) and displays the result(s) in a listbox.

The key features of this particular "module" would be :

  • async search; UI is not frozen while searching
  • throttled; typing fast would not generate a server-side search for each keystroke
  • distinct searching; after searching for "asd" and having the result(s) displayed typing fast [backspace], d (i.e. deleting the last character and retyping it quick) would not redo the server-side search
  • drop-intermediary-results; if I type "asd" and briefly wait (causing the server-side search to be launched) and then, BEFORE the results for asd are displayed finish the search string those intermediary/particular results are dropped

the issue is that after a single exception from the heavy method (the one doing the 'server-side search') the subscription is terminated and cannot be used

So far I only found out a workaround by resubscriping to the IObservable object but this feels wrong. I have also tried .Retry() but although I get to reuse the subscription my OnError handler does not get called anymore.

The code looks like this :

    private IObservable<object> _resultsFromTypeToSearch;

    private void SetupObserver()
    {
        var throttledUserInput =
            (from evt in Observable.FromEventPattern<TextChangedEventArgs>(TxtSearch, "TextChanged")
             select ((TextBox)evt.Sender).Text)
                .Throttle(TimeSpan.FromSeconds(0.6)) // this ensures that only after 0.6 seconds the user input is taken into consideration
                .DistinctUntilChanged();             // this ensures only changed input is taken into consideration

        var getDataFunc = new Func<string, object>(GetExpensiveData);
        var searchAsync = Observable.FromAsyncPattern<string, object>(getDataFunc.BeginInvoke, getDataFunc.EndInvoke);

        var z = from text in throttledUserInput
                from word in searchAsync(text).TakeUntil(throttledUserInput)
                select word;  // TakeUntil will drop an ongoing search if a new search is requested in the meantime
        _resultsFromTypeToSearch = z.ObserveOn(TxtSearch); // this ensures that we'll get notified on the UI thread
        _resultsFromTypeToSearch.Subscribe(PresentResults, OnError);
    }

    private void OnError(Exception obj)
    {
        ClearUI();
        MessageBox.Show("Error");

        _resultsFromTypeToSearch.Subscribe(PresentResults, OnError); // THIS IS MY WORKAROUND WHICH FEELS BAD
    }

    private void ClearUI()
    {
        IsBusy = false;
        Results.Clear();
    }

    private void PresentResults(object result)
    {
        ClearUI();
        Results.Add(result.ToString());
    }

    private object GetExpensiveData(string searchString)
    {
        IsBusy = true;
        if (DateTime.Now.Millisecond % 3 == 0) throw new ServerException();
        Thread.Sleep(2000);
        return "Data for " + searchString;
    }

Any better way to do this?


Solution

  • I was misunderstanding Rx. A sequence once it gets faulted it can not be reused. The simple solution is to recreate the subscription(s), i.e.: call again SetupObserver()