Search code examples
angulartypescriptrxjspollingrxjs5

RxJS Observable with Subject, polling via timer and combineLatest does not fire


I wrote a function to do polling on an API that is also able to do pagination. Herefor the pagination is done using a Subject Observable and polling is done with the timer method (I also tried interval with the same result).

Here is my code:

  getItems(pagination: Subject<Pagination>): Observable<ListResult<Item>> {
    let params: URLSearchParams = new URLSearchParams();

    return Observable
      .timer(0, 5000)
      .combineLatest(
        pagination,
        (timer, pagination) => pagination
      )
      .startWith({offset: 0, limit: 3})
      .switchMap(pagination => {
        params.set('skip', pagination.offset.toString());
        params.set('limit', pagination.limit.toString());
        return this.authHttp.get(`${environment.apiBase}/items`, {search: params})
      })
      .map(response => response.json() as ListResult<Item>)
      .catch(this.handleError);
  }

The expected behavior would be: HTTP request is fired every 5 seconds AND when the user changes the page.

This is what happens: First HTTP request is fired, but then no other request is sent to the server UNTIL pagination is used. After pagination is used the first time, the polling starts working too.

It's the first time I'm using Observables so I'm pretty sure I missed something, but I can't see what it might be.

I also tried this approach (maybe it was missing the timer counter in startWith), but it didn't change anything.

[...]
  .combineLatest(
    pagination
  )
  .startWith([0, {offset: 0, limit: 3}])
[...]

Solution

  • The combineLatest() operator requires all source Observables to emit at least one item.

    Your demo makes only one request because you're using .startWith(). The combineLatest() never emits because the pagination is a Subject and it probably never emits any item.

    So one option is to move .startWith():

    .combineLatest(
      pagination.startWith({offset: 0, limit: 3}),
      (timer, pagination) => pagination
    )
    

    But maybe this doesn't help you much because you're ignoring all items coming from the timer() and you're using just pagination. So maybe you could use just merge() to refresh the list from one of the sources. Then the timer() independently increases the offset.

    Observable
      .timer(0, 1000)
      .map(i => { return {offset: i*3, limit: 3}})
      .merge(pagination.startWith({offset: 0, limit: 3}))
      .switchMap(pagination => {
        return Observable.of(pagination).delay(200)
      })
      .subscribe(val => console.log(val));