Search code examples
javascriptrxjsreactive-extensions-js

Turning paginated requests into an Observable stream with RxJs


I have a service which returns data in pages. The response to one page contains details on how to query for the next page.

My approach is to return the response data and then immediately concat a deferred call to the same observable sequence if there are more pages available.

function getPageFromServer(index) {
  // return dummy data for testcase
  return {nextpage:index+1, data:[1,2,3]};
}

function getPagedItems(index) {
  return Observable.return(getPageFromServer(index))
    .flatMap(function(response) {
      if (response.nextpage !== null) {
        return Observable.fromArray(response.data)
          .concat(Observable.defer(function() {return getPagedItems(response.nextpage);}));
      }

      return Observable.fromArray(response.data);
    });
}

getPagedItems(0).subscribe(
  function(item) {
    console.log(new Date(), item);
  },
  function(error) {
    console.log(error);
  }
)

This must be the wrong approach, because within 2 seconds you get:

      throw e;
            ^
RangeError: Maximum call stack size exceeded
    at CompositeDisposablePrototype.dispose (/Users/me/node_modules/rx/dist/rx.all.js:654:51)

What is the correct approach to pagination?


Solution

  • EDIT Ah! I see the problem you're facing. A bit of tail call optimization should fix you up:

    function mockGetPageAjaxCall(index) {
      // return dummy data for testcase
      return Promise.resolve({nextpage:index+1, data:[1,2,3]});
    }
    
    function getPageFromServer(index) {
      return Observable.create(function(obs) {
        mockGetPageAjaxCall(index).then(function(page) {
          obs.onNext(page);
        }).catch(function(err) {
          obs.onError(err)
        }).finally(function() {
          obs.onCompleted();
        });
      });
    }
    
    function getPagedItems(index) {
        return Observable.create(function(obs) {
            // create a delegate to do the work
            var disposable = new SerialDisposable();
            var recur = function(index) {
                disposable.setDisposable(getPageFromServer(index).retry().subscribe(function(page) {
                    obs.onNext(page.items);
                    if(page.nextpage === null) {
                      obs.onCompleted();   
                    }
    
                    // call the delegate recursively
                    recur(page.nextpage);
                }));
            };
    
            // call the delegate to start it
            recur(0);
    
            return disposable;
        });
    }
    
    getPagedItems(0).subscribe(
      function(item) {
        console.log(new Date(), item);
      },
      function(error) {
        console.log(error);
      }
    )