Search code examples
javascriptrecursionrxjsobservablereactive-programming

Recursion and Observable RxJs


I am performing pagination inside and Observable stream. The pagination is implemented with a cursor and a total count using recursion.

I am able to emit the every page using the following code observer.next(searches);, by the way I would like to use just observable and no promises but I cannot express recursion using RxJs operators.

Any suggestions?

    const search = id =>
      new Observable(observer => { recursePages(id, observer) })

    const recursePages = (id, observer, processed, searchAfter) => {
      httpService.post(
        "http://service.com/search",
        {
          size: 50,
          ...searchAfter ? { search_after: searchAfter } : null,
          id,
        })
        .toPromise() // httpService.post returns an Observable<AxiosResponse>
        .then(res => {
          const body = res.data;
          const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
          observer.next(searches);
          const totalProcessed = processed + searches.length;
          if (totalProcessed < body.data.total) {
            return recursePages(id, observer, totalProcessed, searches[searches.length - 1].cursor);
          }
          observer.complete();
        })
    }

    // General Observer
    incomingMessages.pipe(
        flatMap(msg => search(JSON.parse(msg.content.toString()))),
        concatAll(),
    ).subscribe(console.log),    



Solution

  • these methods will recursively gather all the pages and emit them in an array. the pages can then be streamed with from as shown:

    // break this out to clean up functions
    const performSearch = (id, searchAfter?) => {
      return httpService.post(
        "http://service.com/search",
        {
          size: 50,
          ...searchAfter ? { search_after: searchAfter } : null,
          id,
        });
    }
    
    // main recursion
    const _search = (id, processed, searchAfter?) => {
      return performSearch(id, searchAfter).pipe( // get page
        switchMap(res => {
          const body = res.data;
          const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
          const totalProcessed = processed + searches.length;
          if (totalProcessed < body.total) {
            // if not done, recurse and get next page
            return _search(id, totalProcessed, searches[searches.length - 1].cursor).pipe(
              // attach recursed pages
              map(nextPages => [searches].concat(nextPages)
            );
          }
          // if we're done just return the page
          return of([searches]);
        })
      )
    }
    
    // entry point
    // switch into from to emit pages one by one
    const search = id => _search(id, 0).pipe(switchMap(pages => from(pages))
    

    if what you really need is all of the pages to emit one by one before they're all fetched, for instance so you can show page 1 as soon as it's available rather than wait on page 2+, then that can be done with some tweaking. let me know.

    EDIT: this method will emit one by one

    const _search = (id, processed, searchAfter?) => {
      return performSearch(id, searchAfter).pipe( // get page
        switchMap(res => {
          const body = res.data;
          const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
          const totalProcessed = processed + searches.length;
          if (totalProcessed < body.total) {
            // if not done, concat current page with recursive call for next page
            return concat(
              of(searches),
              _search(id, totalProcessed, searches[searches.length - 1].cursor)
            );
          }
          // if we're done just return the page
          return of(searches);
        })
      )
    }
    const search = id => _search(id, 0)
    

    you end up with an observable structure like:

    concat(
      post$(page1),
      concat(
        post$(page2),
        concat(
          post$(page3),
          post$(page4)
        )
      )
    )
    

    and since nested concat() operations reduce to a flattened structure, this structure would reduce to:

    concat(post$(page1), post$(page2), post$(page3), post$(page4))
    

    which is what you're after and the requests run sequentially.

    it also seems like expand might do the trick as per @NickL 's comment, soemthing like:

    search = (id) => {
      let totalProcessed = 0;
      return performSearch(id).pipe(
        expand(res => {
          const body = res.data;
          const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
          totalProcessed += searches.length;
          if (totalProcessed < body.data.total) {
            // not done, keep expanding
            return performSearch(id, searches[searches.length - 1].cursor);
          }
          return EMPTY; // break with EMPTY
        })
      )
    }
    

    though I've never used expand before and this is based off some very limited testing of it, but I am pretty certain this works.

    both of these methods could use the reduce (or scan) operator to gather results if you ever wanted:

    search(id).pipe(reduce((all, page) => all.concat(page), []))