Search code examples
reactive-programmingrxjsinfinite-scrollobservable

RxJs Observable with infinite scroll OR how to combine Observables


I have a table which uses infinite scroll to load more results and append them, when the user reaches the bottom of the page.

At the moment I have the following code:

var currentPage = 0;
var tableContent = Rx.Observable.empty();

function getHTTPDataPageObservable(pageNumber) {
    return Rx.Observable.fromPromise($http(...));
}

function init() {
    reset();
}

function reset() {
    currentPage = 0;
    tableContent = Rx.Observable.empty();
    appendNextPage();
}

function appendNextPage() {
    if(currentPage == 0) {
        tableContent = getHTTPDataPageObservable(++currentPage)
                .map(function(page) { return page.content; });
    } else {
        tableContent = tableContent.combineLatest(
            getHTTPDataPageObservable(++currentPage)
                    .map(function(page) { return page.content; }),
            function(o1, o2) {
                return o1.concat(o2);
            }
        )
    }
}

There's one major problem:

Everytime appendNextPage is called, I get a completely new Observable which then triggers all prior HTTP calls again and again.

A minor problem is, that this code is ugly and it looks like it's too much for such a simple use case.

Questions:

How to solve this problem in a nice way?

Is is possible to combine those Observables in a different way, without triggering the whole stack again and again?


Solution

  • You didn't include it but I'll assume that you have some way of detecting when the user reaches the bottom of the page. An event that you can use to trigger new loads. For the sake of this answer I'll say that you have defined it somewhere as:

    const nextPage = fromEvent(page, 'nextpage');
    

    What you really want to be doing is trying to map this to a stream of one directional flow rather than sort of using the stream as a mutable object. Thus:

    const pageStream = nextPage.pipe(
      //Always trigger the first page to load
      startWith(0),
    
      //Load these pages asynchronously, but keep them in order
      concatMap(
        (_, pageNum) => from($http(...)).pipe(pluck('content')) 
      ),
            
      //One option of how to join the pages together
      scan((pages, p) => ([...pages, p]), [])
    )
    

    ;

    If you need reset functionality I would suggest that you also consider wrapping that whole stream to trigger the reset.

    resetPages.pipe(
      // Used for the "first" reset when the page first loads
      startWith(0),
    
      //Anytime there is a reset, restart the internal stream.
      switchMapTo( 
        nextPage.pipe(
          startWith(0),
          concatMap(
            (_, pageNum) => from($http(...)).pipe(pluck('content'))
          ),
          scan((pages, p) => ([...pages, p]), [])
      )
    ).subscribe(x => /*Render page content*/);
    

    As you can see, by refactoring to nest the logic into streams we can remove the global state that was floating around before