I have an issue where I'm attempting to order multiple distinct streams from a time series database. Assuming that all the data in each stream is sorted by timestamp, given the following code how would I modify streams dataA$
and dataB$
such that each of them emitted values in order of the timestamped value WITHOUT waiting until the entire stream has completed:
import { delayWhen, of, timer } from "rxjs";
const dataA = [{"data":"b","timestamp":6672},{"data":"c","timestamp":7404},{"data":"a","timestamp":7922},{"data":"b","timestamp":8885},{"data":"c","timestamp":9111},{"data":"a","timestamp":9245},{"data":"c","timestamp":10168},{"data":"b","timestamp":10778},{"data":"c","timestamp":11504},{"data":"a","timestamp":12398},{"data":"a","timestamp":12745},{"data":"a","timestamp":13648},{"data":"a","timestamp":14233},{"data":"a","timestamp":14943},{"data":"b","timestamp":15869},{"data":"c","timestamp":16043},{"data":"a","timestamp":16169},{"data":"a","timestamp":16242},{"data":"a","timestamp":17058},{"data":"b","timestamp":17885},{"data":"a","timestamp":18252},{"data":"a","timestamp":18711},{"data":"c","timestamp":18883},{"data":"b","timestamp":19618},{"data":"a","timestamp":20183}];
const dataB = [{"data":"b","timestamp":821},{"data":"b","timestamp":1357},{"data":"b","timestamp":2108},{"data":"b","timestamp":3001},{"data":"a","timestamp":3995},{"data":"b","timestamp":4475},{"data":"c","timestamp":5357},{"data":"c","timestamp":5373},{"data":"b","timestamp":6199},{"data":"c","timestamp":6207},{"data":"b","timestamp":6896},{"data":"b","timestamp":7410},{"data":"a","timestamp":8335},{"data":"a","timestamp":9191},{"data":"b","timestamp":10007},{"data":"b","timestamp":10703},{"data":"c","timestamp":11225},{"data":"c","timestamp":11453},{"data":"c","timestamp":12131},{"data":"c","timestamp":12599},{"data":"c","timestamp":13567},{"data":"a","timestamp":13726},{"data":"b","timestamp":14161},{"data":"b","timestamp":14224},{"data":"b","timestamp":14666}];
const dataA$ = of(dataA).pipe(
delayWhen(() => timer(Math.random() * 5000)),
???
);
const dataB$ = of(dataB).pipe(
delayWhen(() => timer(Math.random() * 5000)),
???
);
let lastTimestamp = -Infinity;
dataA$.subscribe(({ timestamp }) => {
expect(timestamp > lastTimestamp).toBe(true);
lastTimestamp = timestamp;
});
dataB$.subscribe(({ timestamp }) => {
expect(timestamp > lastTimestamp).toBe(true);
lastTimestamp = timestamp;
});
Follow up question: How can you extend that solution to dynamically support any number of data streams once a stream was created?
I am not sure if an "RxJS-only" approach using only pipe operators exists. However, here is how I would do it:
To make this reproducable, I created the following setup. This just pushes the elements of the arrays into the Subjects
with a delay.
const dataA$ = new Subject()
const dataB$ = new Subject()
timer(0, 4000).subscribe(t => {
if (!dataA[0]) return
dataA$.next(dataA[0])
dataA.shift()
})
timer(0, 3900).subscribe(t => {
if (!dataB[0]) return
dataB$.next(dataB[0])
dataB.shift()
})
Just replace dataA$
and dataB$
with your own Observables and remove the timer
s and subscriptions.
The resulting streams of data will be stored in two seperate Observables for which I created two Subjects
.
const resultA = new Subject()
const resultB = new Subject()
It would be nice to just use dataA$
and datab$
and control how they emit by referencing each other in their pipes but this creates a circular dependency and I am not sure if this is managable.
Now we also need two buffers where the intermediate values are stored.
let aBuffer = []
let bBuffer = []
Let's subscribe to both data streams.
dataA$.subscribe(({ timestamp }) => {
if (bBuffer.length === 0) {
aBuffer.push(timestamp)
return
}
while (true) {
let first = bBuffer[0]
if (first > timestamp) {
resultA.next(timestamp)
break
} else if (timestamp > first) {
resultB.next(first)
bBuffer.shift()
} else {
resultA.next(timestamp)
resultB.next(first)
bBuffer.shift()
break
}
}
});
dataB$.subscribe(({ timestamp }) => {
if (aBuffer.length === 0) {
bBuffer.push(timestamp)
return
}
while (true) {
let first = aBuffer[0]
if (first > timestamp) {
resultB.next(timestamp)
break
} else if (timestamp > first) {
resultA.next(first)
aBuffer.shift()
} else {
resultB.next(timestamp)
resultA.next(first)
aBuffer.shift()
break
}
}
});
Both implementations are identical. Just a
and b
are changed. We basically wait for the other stream to have emitted a value. Until then, we push the values into the buffer.
When the a stream emits and the current timestamp is behind the first value of the buffer, we push the timestamp to the result. But if the buffer's first value lags behind, we use the while
loop to "catch up" and emit all the lagging values in the process.
This seems to work fine for the example in the question.
let lastTimestamp = -Infinity;
resultA.subscribe((r) => {
console.log("A", r, r > lastTimestamp)
lastTimestamp = r
})
resultB.subscribe((r) => {
console.log("B", r, r > lastTimestamp)
lastTimestamp = r
})
Output:
/* ... */
B 5373
true
B 6199
true
B 6207
true
A 6672
true
B 6896
true
A 7404
true
B 7410
true
/* ... */