Search code examples
javascriptasynchronousrxjsrxjs-pipeable-operators

Preventing premature completion of an async pipeable operator in RxJS


I'm creating pipeable operators using RxJS 6, and am unclear about how to complete() the observer when the operation is asynchronous.

For a synchronous operation, the logic is simple. In the example below, all values from the source Observable will be passed to observer.next(), and after that observer.complete() is called.

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

For an asynchronous operation, however, I'm a bit at a loss. In the example below, the asynchronous operation is represented by a call to setTimeout(). Obviously, observer.complete() will be called before any of the values are passed to observer.next().

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

So the question is: what is the idiomatic RxJS approach to make it so that the call to observer.complete() is only made after all values are asynchronously passed to observer.next()? Should I be manually keeping track of pending calls or is there a more "reactive" solution?

(Note that the example above is a simplification of my actual code, and that the call to setTimeout() is meant to represent "any asynchronous operation". I'm looking for a general approach to dealing with async operations in pipeable operators, not advice on how to deal with delays or timeouts in RxJS.)


Solution

  • One line of thought could be to restructure your asyncOp to use other operators such as mergeMap.

    This is the code that reproduces your example using this approach

    const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
    from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));
    

    Whether this is something worth considering depends on what your asyncOp does. If it is asynchronous because it relies on some callback, like in case of https calls or reads from file system, than I think this approach can work since you can turn a callback based function into an Observable.