I was trying out a "type-to-search" Reactive-Extensions sample which takes a string from a textbox (WPF app if it matters) and does a potentially-lengthy server-side search (simulated in my case with a Thread.Sleep) and displays the result(s) in a listbox.
The key features of this particular "module" would be :
the issue is that after a single exception from the heavy method (the one doing the 'server-side search') the subscription is terminated and cannot be used
So far I only found out a workaround by resubscriping to the IObservable object but this feels wrong. I have also tried .Retry() but although I get to reuse the subscription my OnError
handler does not get called anymore.
The code looks like this :
private IObservable<object> _resultsFromTypeToSearch;
private void SetupObserver()
{
var throttledUserInput =
(from evt in Observable.FromEventPattern<TextChangedEventArgs>(TxtSearch, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(0.6)) // this ensures that only after 0.6 seconds the user input is taken into consideration
.DistinctUntilChanged(); // this ensures only changed input is taken into consideration
var getDataFunc = new Func<string, object>(GetExpensiveData);
var searchAsync = Observable.FromAsyncPattern<string, object>(getDataFunc.BeginInvoke, getDataFunc.EndInvoke);
var z = from text in throttledUserInput
from word in searchAsync(text).TakeUntil(throttledUserInput)
select word; // TakeUntil will drop an ongoing search if a new search is requested in the meantime
_resultsFromTypeToSearch = z.ObserveOn(TxtSearch); // this ensures that we'll get notified on the UI thread
_resultsFromTypeToSearch.Subscribe(PresentResults, OnError);
}
private void OnError(Exception obj)
{
ClearUI();
MessageBox.Show("Error");
_resultsFromTypeToSearch.Subscribe(PresentResults, OnError); // THIS IS MY WORKAROUND WHICH FEELS BAD
}
private void ClearUI()
{
IsBusy = false;
Results.Clear();
}
private void PresentResults(object result)
{
ClearUI();
Results.Add(result.ToString());
}
private object GetExpensiveData(string searchString)
{
IsBusy = true;
if (DateTime.Now.Millisecond % 3 == 0) throw new ServerException();
Thread.Sleep(2000);
return "Data for " + searchString;
}
Any better way to do this?
I was misunderstanding Rx. A sequence once it gets faulted it can not be reused. The simple solution is to recreate the subscription(s), i.e.: call again SetupObserver()