Search code examples
javascriptrxjsreactivex

In Rxjs, how do I flatten or merge a stream containing both normal types and Observables?


Analogous to an array, flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6].

I would like to do this in rxjs observables:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()

test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7

mergeAll doesn't work and throws an error.

Here is very dirty solution:

const inElegant$ = Rx.Observable.merge(
  test$.filter(x => x instanceof Rx.Observable).mergeAll(),
  test$.filter(x => !(x instanceof Rx.Observable))
)

inElegant$.subscribe(x => console.log(x));

Are there any better solutions to this?

Jsbin http://jsbin.com/vohizoqiza/1/edit?js,console


Solution

  • If we have a stream on the form

    const stream = Rx.Observable.from(
        [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])
    

    there are a few ways to convert it to a stream of pure numbers (that does not include filtering as in your solution.)

    These are three possible solutions:

    // prints 1, 2, 3, 4, 5, 6, 7
    stream
        .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
        .concatAll()
        .subscribe(x => console.log(x));
    
    // prints 1, 2, 3, 4, 5, 6, 7
    stream
        .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
        .subscribe(x => console.log(x));
    
    // prints 1, 2, 3, 4, 5, 6, 7
    stream
        .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
        .mergeAll()
        .subscribe(x => console.log(x));
    

    This all looks good. There are a few things to consider however. If we alter the source stream to make it asynchronous:

    const asyncStream = Rx.Observable.interval(1000)
        .select((val, idx) => idx + 8).take(5);
    
    const stream = Rx.Observable.from(
        [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
        asyncStream, 13, 14, 15])
    

    We get the following results using the same solutions as earlier:

    // prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
    stream
        .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
        .concatAll()
        .subscribe(x => console.log(x));
    
    // prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
    stream
        .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
        .subscribe(x => console.log(x));
    
    // prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
    stream
        .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
        .mergeAll()
        .subscribe(x => console.log(x));
    

    So to sum up. using selectMany or select followed by mergeAll solves the problem of producing a flattened list of the correct type, but the order is not maintained. These solutions will listen to all streams and produce a result when any of the streams produce a value.

    The concatAll solution behaves a little different. This solution will listen to each stream in order, switching to the next value/stream only when the last stream has completed.

    So these are some solutions, which one you want depends on your needs. All of them however gets rid of the need to filter the streams.