Say I have a stream of numbers
---1-----1----1----0----0----1----1----0--->
And i want to get a new stream of arrays containing consecutive 1's like this
---[1,1,1]---[1,1]--->
I though of using the scan function but that only emits one value and I read about bufferToggle but documentation only uses it with timed observables. Is there any function that does this?
One possible way is to use scan
with the pairwise
operator.
Using pairwise you can compare the N-1
th emission to the N
th.
console.clear();
var source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
source
.concat(Rx.Observable.of(0)) // for the case when source completes with 1
.scan(function(acc, x) {
// reset accumulator to an empty array when a 0 is encountered
if (x === 0) {
return [];
} else {
return acc.concat([x]);
}
}, [])
.pairwise()
// filter when the accumulated value goes from a length greater than 0
// to 0 then you know you've hit 0
.filter((pair) => pair[0].length > 0 && pair[1].length === 0)
// take the first element of the pair since it has the result you're interested in
.map((pair) => pair[0])
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.min.js"></script>