Search code examples
rxjsrxjs-pipeable-operators

Unexpected behavior with rxjs nested observables, window, and scan


I want to display partial results of an analysis as the data comes in. It would be very inefficient to recompute for each new value (as with 'scan'). However, in this case, I can do the analysis on chunks of the data and combine those results. So I've been using 'window' to break up the data and then 'scan' to combine the results of each window calculation. The result is itself an observable, so it would be very natural to emit that as a nested observable. Also, the next step in the process works really well when consuming observables.

However, I couldn't get this to work as I expected. (I did make it work with an awkward step of turning the inner observables into arrays and later back into observables.) It seems there is something I don't understand about "window" and/or "scan".

Here are two examples that differ in how I produce the nested observable. I'd have expected the following two examples to give the same result, but they do not.

In the first, I create the nested observable directly. In the second, I create it with the window operation. Then, in both cases, I apply the same scan to the nested observable.

This behaves as I expected:

rxjs.from([rxjs.from([1, 2]), rxjs.from([3, 4])])
    .pipe(
        ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
    ).subscribe(win => win.subscribe(
        x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
        e => console.log("outer error"), () => console.log("outer |"))

With each emitted observable, I see the accumulation of the values of the previous one followed by the new ones. 1 2 | 1 2 3 4 |

I expected this next one to produce the same result, but it doesn't:

rxjs.from([1, 2, 3, 4])
    .pipe(
        ops.windowCount(2),
        ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
    ).subscribe(win => win.subscribe(x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
        e => console.log("outer error"), () => console.log("outer|"))

It seems to effectively ignore the scan operation and emits the original windows, 1 2 | 3 4 |

What am I missing? What would a conventional solution to this look like? Thanks!


Solution

  • windowCount is using a Subject internally. So it creates and returns a Subject and then sends 1 and 2 to it for the first window. With the first scan iteration you subscribe to this Subject before 1 and 2 are send and receive those values. For later iterations you subscribe after 1 and 2 were already emitted so you won't receive those values again.

    Kind of like:

    const { Subject, merge, from } = rxjs
    
    const window1 = new Subject()
    const scanResult1 = merge(from([]), window1)
    scanResult1.subscribe(console.log)
    window1.next(1)
    window1.next(2)
    
    console.log('|')
    
    const window2 = new Subject()
    const scanResult2 = merge(scanResult1, window2)
    scanResult2.subscribe(console.log)
    window2.next(3)
    window2.next(4)
    <script src="https://unpkg.com/@reactivex/rxjs@6.5.5/dist/global/rxjs.umd.js"></script>

    Using bufferCount

    You can simply replace windowCount with bufferCount to send an array to scan instead of a Subject. The code in scan can stay the same as merge can also handle arrays, but you should use concat instead of merge if you want to guarantee that the values are emitted in the same order they come in.

    rxjs.from([1, 2, 3, 4])
      .pipe(
        ops.bufferCount(2),
        ops.scan((acc, curr) => rxjs.concat(acc, curr), rxjs.from([]))
      ).subscribe(
        win => win.subscribe(
          x => console.log(JSON.stringify(x)), 
          e => console.log("error"), () => console.log("|")
        ),
        e => console.log("outer error"),
        () => console.log("outer|")
      )
    

    Using windowCount

    You can add a shareReplay to your windows, to replay their values to future subscribers. As windowCount emits an empty window at the end if the count of the source is divisable by the windowSize, you have to map to your merged observable only when the current window isn't empty. Otherwise you'll get the final result twice.

    from([1, 2, 3, 4]).pipe(
      windowCount(2),
      scan((acc, curr) => {
        const shared = curr.pipe(shareReplay())
        return shared.pipe(
          isEmpty(),
          switchMap(empty => empty ? EMPTY : merge(acc, shared))
        )
      }, from([]))
    )
    

    Or

    from([1, 2, 3, 4]).pipe(
      windowCount(2),
      map(w => w.pipe(shareReplay())),
      concatMap(w => w.pipe(isEmpty(), filter(e => !e), mapTo(w))),
      scan((acc, curr) => merge(acc, curr), from([]))
    )