I created the Observable constructor below that works as described. Does anyone know if there is a more concise way of achieving the same behaviour using the operators that come with RxJs? I was looking at bufferToggle which is close to the required behaviour, but I need the emitted values to be passed through when the buffer is closed.
Function Description: Buffers the emitted source
values if the condition
emits true
, and passes through the emitted source
values if the condition
emits false
. If the condition emits false
after being true
, the buffer releases each value in the order that they were received. The buffer is initialised to pass through the emitted source
values until the condition
emits true
.
function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.do(value => isBufferOpen = value)
.filter(value => !value)
.subscribe(value => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => {
subscriptions.forEach(sub => sub.unsubscribe());
};
});
}
In response to comment, the following is the same function as the one above but in the form of an RxJs Operator and updated to use RxJx 6+ pipeabale Operators:
function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer: T[] = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.pipe(
tap(con => isBufferOpen = con),
filter(() => !isBufferOpen)
).subscribe(() => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => subscriptions.forEach(sub => sub.unsubscribe());
});
}
}
I've found a solution based on operators rather than subscriptions, but hesitate to call it more concise.
Note, the endToken can be removed if it's possible to guarantee the buffer on/off stream always ends with an off (i.e an odd number of emits).
console.clear()
const Observable = Rx.Observable
// Source and buffering observables
const source$ = Observable.timer(0, 200).take(15)
const bufferIt$ = Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
// Function to switch buffering
const endToken = 'end'
const bufferScanner = { buffering: false, value: null, buffer: [] }
const bufferSwitch = (scanner, [src, buffering]) => {
const onBufferClose = (scanner.buffering && !buffering) || (src === endToken)
const buffer = (buffering || onBufferClose) ? scanner.buffer.concat(src) : []
const value = onBufferClose ? buffer : buffering ? null : [src]
return { buffering, value, buffer }
}
// Operator chain
const output =
source$
.concat(Observable.of(endToken)) // signal last buffer to emit
.withLatestFrom(bufferIt$) // add buffering flag to stream
.scan(bufferSwitch, bufferScanner) // turn buffering on and off
.map(x => x.value) // deconsruct bufferScanner
.filter(x => x) // ignore null values
.mergeAll() // deconstruct buffer array
.filter(x => x !== endToken) // ignore endToken
// Proof
const start = new Date()
const outputDisplay = output.timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
Footnote
I also found a solution based on buffer()
, but am not convinced it's stable with a high-frequency source. There seems to be something hokey with certain buffer configurations (i.e the declaration looks sound but tests show occasional delays which interfere with buffer operation).
In any case, for reference,
/*
Alternate with buffered and unbuffered streams
*/
const buffered =
source$.withLatestFrom(bufferIt$)
.filter(([x, bufferIsOn]) => bufferIsOn)
.map(x => x[0])
.buffer(bufferIt$.filter(x => !x))
.filter(x => x.length) // filter out empty buffers
.mergeAll() // unwind the buffer
const unbuffered =
source$.withLatestFrom(bufferIt$)
.filter(([x, bufferIsOn]) => !bufferIsOn)
.map(x => x[0])
const output = buffered.merge(unbuffered)