Search code examples
angulartypescriptrxjsrxjs6

Why does a second subscription change the outcome of my stream?


The following code works as expected (see output):

import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };

let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();

obs$ = sub$.pipe(
    scan((acc: ColumnFilter[], curr) => {
        const index = acc.findIndex(f => f.field === curr.field);
        if (index === -1) {
            acc.push({ field: curr.field, value: curr.value });
            return acc;
        }

        curr.value === '' ?
            acc.splice(index, 1) :
            acc[index] = { field: curr.field, value: curr.value };
        return acc;
    }, [])
);


obs$.subscribe(
    value => console.log(value)
);

sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })


// output:
//    [ { field: 'size', value: 'xl' } ]
//    [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
//    [ { field: 'size', value: 'xl' } ]

Now if I add a second Subscription to the obs$ observable, the output changes: (Check the last value of the stream. It now contains { field: 'color', value: '' })

import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };


let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();

obs$ = sub$.pipe(
    scan((acc: ColumnFilter[], curr) => {
        const index = acc.findIndex(f => f.field === curr.field);
        if (index === -1) {
            acc.push({ field: curr.field, value: curr.value });
            return acc;
        }

        curr.value === '' ?
            acc.splice(index, 1) :
            acc[index] = { field: curr.field, value: curr.value };
        return acc;
    }, [])
);

// vvv this was added
obs$.subscribe()
// ^^^ this was added

obs$.subscribe(
    value => console.log(value)
);

sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })

// output:
//    [ { field: 'size', value: 'xl' } ]
//    [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
//    [ { field: 'size', value: 'xl' }, { field: 'color', value: '' } ]

I don't understand why the second subscription is changing the outcome of the stream. I can only guess it has something to do with mutating the accumulator? Because if I use acc.filter instead of acc.splice, it works as expected.

Edit

Added a stackblitz: https://stackblitz.com/edit/rxjs-6-opeartors-uzojgw?file=index.ts


Solution

  • Here's what happens in your second sample code: When an event is triggered by sub$.next(), the pipe is executed once for each subscribed observable. Since the acc: ColumnFilter[] of the scan operator is shared between these executions, the described side effects occur.

    My suggestion is to use either the share or shareReplay operator so that the pipe is executed only once per event and the values emitted by the pipe are multicasted for the subscribers (= all subscribers get the same object reference).

    obs$ = sub$.pipe(
      scan((acc: ColumnFilter[], curr) => {
        const index = acc.findIndex((f) => f.field === curr.field);
        if (index === -1) {
          acc.push({ field: curr.field, value: curr.value });
          return acc;
        }
    
        curr.value === ''
          ? acc.splice(index, 1)
          : (acc[index] = { field: curr.field, value: curr.value });
        return acc;
      }, []),
      share() // you might use shareReplay(1) instead, e.g. if you have late subscribers
    );