Search code examples
rxjsreactive-streams

In rxjs how can I create groups of sequential values?


I have a stream of values in this form:

[1,2,3,1,2,1,1,1,2...]

I would like to convert it to a stream of groups in the form:

[[1,2,3],[1,2],[1],[1],[1,2]...]

New group should be created every time value becomes 1.


Solution

  • You can use the bufferWhen() operator to collect emitted values until an observable emits a value. In this example I use a subject to emit a shallow copy of the stream to the buffer.

    The buffer will emit when ever the number 1 is emitted. If the stream starts with 1 then an empty array is emitted. So I filter that away.

    const {from, Subject} = rxjs;
    const {filter, bufferWhen, tap, skip} = rxjs.operators;
    
    const stream = from([1,2,3,1,2,1,1,1,2]);
    const trigger = new Subject();
    
    stream.pipe(
       tap(v => trigger.next(v)),
       bufferWhen(() => trigger.pipe(filter(v => v === 1))),
       filter(v => v.length)
    ).subscribe(x => console.log(x));
    <script src="https://unpkg.com/@reactivex/rxjs@6.x/dist/global/rxjs.umd.js"></script>

    You can use the scan() if you wanted to emit the trigger when the value decreases from the previous. Which is maybe a little better in logic, but using 1 as the trigger fits with the question.