I want to display partial results of an analysis as the data comes in. It would be very inefficient to recompute for each new value (as with 'scan'). However, in this case, I can do the analysis on chunks of the data and combine those results. So I've been using 'window' to break up the data and then 'scan' to combine the results of each window calculation. The result is itself an observable, so it would be very natural to emit that as a nested observable. Also, the next step in the process works really well when consuming observables.
However, I couldn't get this to work as I expected. (I did make it work with an awkward step of turning the inner observables into arrays and later back into observables.) It seems there is something I don't understand about "window" and/or "scan".
Here are two examples that differ in how I produce the nested observable. I'd have expected the following two examples to give the same result, but they do not.
In the first, I create the nested observable directly. In the second, I create it with the window operation. Then, in both cases, I apply the same scan to the nested observable.
This behaves as I expected:
rxjs.from([rxjs.from([1, 2]), rxjs.from([3, 4])])
.pipe(
ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
).subscribe(win => win.subscribe(
x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
e => console.log("outer error"), () => console.log("outer |"))
With each emitted observable, I see the accumulation of the values of the previous one followed by the new ones.
1 2 | 1 2 3 4 |
I expected this next one to produce the same result, but it doesn't:
rxjs.from([1, 2, 3, 4])
.pipe(
ops.windowCount(2),
ops.scan((acc, curr) => rxjs.merge(acc, curr), rxjs.from([]))
).subscribe(win => win.subscribe(x => console.log(JSON.stringify(x)), e => console.log("error"), () => console.log("|")),
e => console.log("outer error"), () => console.log("outer|"))
It seems to effectively ignore the scan operation and emits the original windows,
1 2 | 3 4 |
What am I missing? What would a conventional solution to this look like? Thanks!
windowCount
is using a Subject
internally. So it creates and returns a Subject and then sends 1
and 2
to it for the first window. With the first scan
iteration you subscribe to this Subject before 1
and 2
are send and receive those values. For later iterations you subscribe after 1
and 2
were already emitted so you won't receive those values again.
Kind of like:
const { Subject, merge, from } = rxjs
const window1 = new Subject()
const scanResult1 = merge(from([]), window1)
scanResult1.subscribe(console.log)
window1.next(1)
window1.next(2)
console.log('|')
const window2 = new Subject()
const scanResult2 = merge(scanResult1, window2)
scanResult2.subscribe(console.log)
window2.next(3)
window2.next(4)
<script src="https://unpkg.com/@reactivex/rxjs@6.5.5/dist/global/rxjs.umd.js"></script>
Using bufferCount
You can simply replace windowCount
with bufferCount
to send an array to scan
instead of a Subject. The code in scan
can stay the same as merge
can also handle arrays, but you should use concat
instead of merge
if you want to guarantee that the values are emitted in the same order they come in.
rxjs.from([1, 2, 3, 4])
.pipe(
ops.bufferCount(2),
ops.scan((acc, curr) => rxjs.concat(acc, curr), rxjs.from([]))
).subscribe(
win => win.subscribe(
x => console.log(JSON.stringify(x)),
e => console.log("error"), () => console.log("|")
),
e => console.log("outer error"),
() => console.log("outer|")
)
Using windowCount
You can add a shareReplay
to your windows, to replay their values to future subscribers. As windowCount
emits an empty window at the end if the count of the source is divisable by the windowSize, you have to map to your merged observable only when the current window isn't empty. Otherwise you'll get the final result twice.
from([1, 2, 3, 4]).pipe(
windowCount(2),
scan((acc, curr) => {
const shared = curr.pipe(shareReplay())
return shared.pipe(
isEmpty(),
switchMap(empty => empty ? EMPTY : merge(acc, shared))
)
}, from([]))
)
Or
from([1, 2, 3, 4]).pipe(
windowCount(2),
map(w => w.pipe(shareReplay())),
concatMap(w => w.pipe(isEmpty(), filter(e => !e), mapTo(w))),
scan((acc, curr) => merge(acc, curr), from([]))
)