Search code examples
rxjsbehaviorsubjectrxjs-pipeable-operatorsswitchmap

Keep state while operating in switchMap


Suppose that you have a function that returns an rxjs observable that contains a list of objects.

const getItems = () =>
  of([
    {
      id: 1,
      value: 10
    },
    {
      id: 2,
      value: 20
    },
    {
      id: 3,
      value: 30
    }
  ]);

and a second function that returns an observable with a single object

const getItem = id =>
  of({
    id,
    value: Math.floor(Math.random() * 30) + 1
  });

Now we want to create an observable that will get the first list and at a regular interval will randomly update any list item.

const source = getItems().pipe(
  switchMap(items =>
    interval(5000).pipe(
      switchMap(x => {
        // pick up a random id
        const rId = Math.floor(Math.random() * 3) + 1;

        return getItem(rId).pipe(
          map(item =>
            items.reduce(
              (acc, cur) =>
                cur.id === item.id ? [...acc, item] : [...acc, cur],
              []
            )
          )
        );
      })
    )
  )
);

source.subscribe(x => console.log(JSON.stringify(x)));

The problem with the above code is that each time the interval is triggered the items from the previous iteration reset to their initial form. e.g,

[{"id":1,"value":10},{"id":2,"value":13},{"id":3,"value":30}]
[{"id":1,"value":10},{"id":2,"value":20},{"id":3,"value":18}]
[{"id":1,"value":10},{"id":2,"value":16},{"id":3,"value":30}]
[{"id":1,"value":21},{"id":2,"value":20},{"id":3,"value":30}]

As you see, on each interval our code is resetting the list and updates a new item (eg value 13 is lost in the second iteration and reverts to 20). The behaviour seems reasonable since the items argument in the first switchMap acts like a closure.

I managed to somehow solve the issue by using BehaviorSubject but i think that my solution is somehow dirty.

const items$ = new BehaviorSubject([]);

const source = getItems().pipe(
  tap(items => items$.next(items)),
  switchMap(() =>
    interval(5000).pipe(
      switchMap(() => {
        const rId = Math.floor(Math.random() * 3) + 1;

        return getItem(rId).pipe(
          map(item =>
            items$
              .getValue()
              .reduce(
                (acc, cur) =>
                  cur.id === item.id ? [...acc, item] : [...acc, cur],
                []
              )
          ),
          tap(items => items$.next(items)),
          switchMap(() => items$)
        );
      })
    )
  )
);

Is there a better approach ?

Example code can be found here


Solution

  • I believe this should be doing what you want:

    const source = getItems().pipe(
      switchMap(items =>
        interval(1000).pipe(
          switchMap(() => {
            const rId = Math.floor(Math.random() * 3) + 1;
            return getItem(rId);
          }),
          scan((acc, item) => {
            acc[acc.findIndex(i => i.id === item.id)] = item;
            return acc;
          }, items),
        )
      )
    );
    

    It's basically what you're doing but I'm using scan (that is initialized with the original items) to keep the output array in acc so I can update it later again.

    Live demo: https://stackblitz.com/edit/rxjs-kvygy1?file=index.ts