Search code examples
javascriptrxjsoperatorsexpandpagination

RxJs: Paginate through API recursively and find value from list


I am using rxjs v6.4.0. I am trying to paginate through an API searching for a very specific channel where name equals "development". I am using expand to recursively call the API and get new pages. The end result gives me a concatenated list of channels. Then I filter out all channels where name not equal to "development". However I am getting an error: TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

const Rx = require('rxjs')
const Rx2 = require('rxjs/operators')

const getChannel = (cursor) => {
  return this.service.getData(`${url}?cursor=${cursor || ''}`)
      .pipe(Rx2.map(resp => JSON.parse(resp.body)))
      .pipe(Rx2.expand(body => { // recurse the api until no more cursors
      return body.response_metadata && 
        body.response_metadata.next_cursor ? 
        getChannel(body.response_metadata.next_cursor) : Rx.EMPTY
    }))
    .pipe(Rx2.pluck('channels'))
    .pipe(Rx2.mergeAll()) // flattens array
    .pipe(Rx2.filter(c => {
      console.log('finding', c.name === 'development')
      return c.name === 'development'
    }))
}

Solution

  • Let me know if this is the functionality you are looking for https://codepen.io/jeremytru91/pen/wOQxbZ?editors=1111

    const {
      of,
      EMPTY
    } = rxjs;
    
    const {
      filter,
      tap,
      expand,
      take,
      pluck,
      concatAll,
      flatMap,
      first
    } = rxjs.operators;
    
    function* apiResponses() {
      yield of({channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1});
      yield of({channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor:3});
      yield of({channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2});
      yield of({channels:[{id: 4, name: 'react'}, {id: 6, name: 'devcap'}], cursor:4});
    }
    
    let gen = apiResponses();
    
    function getChannel() {
      return gen.next().value // simulate api call
        .pipe(
          expand(body => {
            return body.cursor ? gen.next().value : EMPTY
          }),
          pluck('channels'),
          flatMap(channels => {
            const filtered = channels.filter(channel => channel.name === 'devcap')
            if(filtered.length) {
              return filtered;
            }
            return EMPTY;
          }),
        first()
        );
    }
    
    
    getChannel().subscribe(data => {
      console.log('DATA!! ', data)
      }, e => {
      console.log('ERROR', e)
      throw e
    })