Search code examples
functional-programmingrxjsreactive-programmingreactivex

Create a stream of consecutive values that pass a filter


Say I have a stream of numbers

---1-----1----1----0----0----1----1----0--->

And i want to get a new stream of arrays containing consecutive 1's like this

---[1,1,1]---[1,1]--->

I though of using the scan function but that only emits one value and I read about bufferToggle but documentation only uses it with timed observables. Is there any function that does this?


Solution

  • One possible way is to use scan with the pairwise operator.

    Using pairwise you can compare the N-1th emission to the Nth.

    console.clear();
    var source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
    
    source
      .concat(Rx.Observable.of(0)) // for the case when source completes with 1
      .scan(function(acc, x) {
        // reset accumulator to an empty array when a 0 is encountered
        if (x === 0) {
          return [];
        } else {
          return acc.concat([x]);
        }
      }, [])
      .pairwise()
      // filter when the accumulated value goes from a length greater than 0
      // to 0 then you know you've hit 0
      .filter((pair) => pair[0].length > 0 && pair[1].length === 0)
      // take the first element of the pair since it has the result you're interested in
      .map((pair) => pair[0])
      .subscribe(console.log)
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.min.js"></script>