Search code examples
angularpromiserxjsangular2-observablesreactivex

RxJS operator that waits for completion and then emits one value


Is there an RxJS operator that waits until the source completes and then emits a given value? If there is none, how could I provide it on my own?

This would work similar to toArray() which also waits for the source to complete. Instead of collecting all the emitted values, I want to ignore them and return a different value instead.

Here is an equivalent implementation:

observable.pipe(
  ignoreElements(),
  endWith(myValue),
);

alternatively:

observable.pipe(
  toArray(),
  map(ignore => myValue)
)

There are often situations where I need this. I came to the conclusion that it's dangerous to convert promise then-chains to observables by switchMap() or mergeMap() because inner observables could complete without emitting a value at all. Recently we had this problem:

return getEntitiesFromBackend().pipe(
  switchMap(entities => {
    return putEntitiesToObjectStore(entities);
  }),
  switchMap(() => {
    return storeSyncDate();
  })
);

In some situations the sync date wasn't stored and it was hard to find out why. In the end, the reason was that the putEntities... method emits a value for its "put"-operation. But in those cases the entities array was empty, so no value was emitted at all.

This is what I actually wanted to do - translated to the promise world:

return getEntitiesFromBackend()
  .then(entities => {
    return putEntitiesToObjectStore(entities);
  })
  .then(() => {
    return storeSyncDate();
  })
);

Most of the code I saw which uses switchMap / mergeMap doesn't have this problem. Because most of the time you deal with HTTP requests that only emit once and then completes. See here for example. This made me getting used to convert typical promise patterns to the RxJS world with switchMap without thinking too much about it's actual working and purpose. Now, that we work with IndexedDB most of our methods return observables that emit a value for each DB operation. switchMap / mergeMap will get a nightmare here.

This is why I'm asking for such an operator and wonder why I couldn't find it yet, as it's such a common case in our application. I could easily solve this by using the alternative implementations above, but don't want to repeat those two operators over and over again:

return getEntitiesFromBackend().pipe(
  switchMap(entities => {
    return putEntitiesToObjectStore(entities);
  }),
  ignoreElements(),
  endWith(),
  switchMap(() => {
    return storeSyncDate();
  })
);

Of course I could use toArray() and just ignore the argument in the next operator. I don't like it because it would cause an unnecessary overhead.


Solution

  • I'd suggest two changes.

    1. Use concatMap instead of switchMap or mergeMap. concatMap would ensure each emission from the getEntitiesFromBackend() Observable would be forwarded sequentially one after other and not in parellel (mergeMap) or would get cancelled (switchMap). See here for a brief intro to different types of higher order mapping operators.

    2. Instead of combination of operators like ignoreElements + map you could use the last operator with a predicate that always returns false with a default value. This way the default value would be emitted when the source observable completes.

    return getEntitiesFromBackend().pipe(
      concatMap(entities => {
        return putEntitiesToObjectStore(entities).pipe(
          last(() => false, myValue),
        );
      }),
      concatMap((myValue) => {
        return storeSyncDate();
      })
    );