Search code examples
rxjsreactivex

RxJs: Pattern for observable search results


My scenario is a classic web page with a search form and a result list. I want to encapsulate the behavior of loading the results in an Observable.

Here's what I'm currently doing in TypeScript:

function loadResults(query): Observable<T[]> {}

const results = new Subject<ResultEvent<T[]>>();

const results: Observable<ResultEvent<T[]>> =
  form.valueChanges
    .distinctUntilChanged()
    .do(() => results.next(ResultEvent.pending()))
    .switchMap(query => loadResults(query))
    .subscribe({
      next: (data: T[]) => results.next(ResultEvent.present(data)),
      error: err => results.next(ResultEvent.failed(err)),
    });

The idea is that results always contains the current state of the search: either pending, present or failed. When the query changes, the result is set to pending, and when the service returns data, the result is set to present.

What I don't like about this solution is the explicit call to subscribe(). I'd rather have a simple Observable that can be subscribed an unsubscribed from (eg. in Angular with the async pipe), without creating an explicit subscribtion. The side-effects in do also seem rather hacky.

const results: Obserbable<ResultEvent<T[]>> = 
  form.valueChanges.distinctUntilChanged()
  . /* here be dragons */;

Thanks for any advice and ideas!


Solution

  • I think you want something along these lines:

    const results$ = form.valueChanges
      // This is up to you, but with user input it might make sense to
      // give it just a little bit of time before we hit the server since
      // most user input will be more than a single character.
      //.debounceTime(100)
    
      .distinctUntilChanged()
    
      // Using switchMap guarantees that the inner observable will be
      // cancelled if the input changed while we are still waiting for
      // a result. Newer is always better!
      .switchMap(query => loadResults(query)
        // If we get data, we use it.
        .map(results => ResultEvent.present(results))
    
        // We catch errors and turn them into a failure event.
        .catch(err => Observable.of(ResultEvent.failed(err)))
    
        // Whatever happens, first things first.
        .startWith(ResultEvent.pending())
      );
    

    I would also think about adding a debounceTime in there, by the way.

    Here's a snippet you can copy-paste into https://rxviz.com to see it in action (unfortunately their share link feature doesn't work anymore). Make sure to set the time window to something like 10 seconds.

    const ResultEvent = {
      pending: () => 'Pending',
      failed: err => 'Error: ' + err,
      present: data => 'Data: ' + data,
    };
    
    const loadResults = query => query === 2
      ? Rx.Observable.of(null).delay(500).switchMap(() => Rx.Observable.throw('Oops'))
      : Rx.Observable.of(42).delay(500)
    
    const input$ = Rx.Observable.timer(0, 2000).take(4);
    
    input$.switchMap(query => loadResults(query)
      .map(data => ResultEvent.present(data))
      .catch(err => Rx.Observable.of(ResultEvent.failed(err)))
      .startWith(ResultEvent.pending())
    )