Search code examples
javascriptrxjsreactivex

Merged observable does not contain sequence even though mergees do


I've got a function that essentially takes a DOM element, looks for some identifying stuff, and (in theory) returns an Observable that contains the aforementioned identifying stuff withLatestFromed with some other stuff.

My problem is that the returned Observable doesn't emit any whatchamacallits, even though the primaries$ and highlights$ observables that it's made from do.

I'm terribly sorry if this is poorly explained, I'm new to ReactiveX/RxJS and I'm doing my best; if you need further information, ask.

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()

  // Create return observable from highlights and primaries
  const ret$ = highlights$.withLatestFrom(primaries$)

  // These work
  primaries$.subscribe(x => { console.log("primaries:", x) })
  highlights$.subscribe(x => { console.log("highlights:", x) })

  // This gives me nothing
  ret$.subscribe(x => { console.log("return:", x) })

  return ret$
}

Thank you!


Solution

  • Building on what user3743222 said in their answer, turns out I also needed to pause/buffer highlights$ when creating ret$, and wait for primaries$ to emit something before continuing. Here's how I did that using Rx.Observable.prototype.pausableBuffered

    function randomFunction(element) {
      // Create an observable sequence of text nodes from an array
      const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))
    
      // Get "highlights" by doing stuff with text nodes
      const highlights$ = textNodes$
        .map(doSomeStuff)
        .zip(textNodes$, (res, node) => ({res, node}))
        .filter(({res, node}) => res.length && node.isConnected)
        .share()
        .tap(console.log.bind(console, 'highlights:'))
    
      // Get "primaries" by doing stuff with "highlights"
      const primaries$ = highlights$
        .map(x => x.res)
        .flatMap(x => x.filter(y => y.matches && y.isPrimary))
        .map(x => x.id)
        .toSet()
        .tap(console.log.bind(console, 'primaries:'))
    
      // Observable that outputs true when primaries$ outputs anything
      const primariesExist$ = primaries$.map(() => true)
    
      // Create return observable from highlights and primaries
      return highlights$
        .pausableBuffered(primariesExist$)
        .withLatestFrom(primaries$)
    }