Search code examples
rxjsobservable

Proper way to complete an rxjs observable interval?


My scenario is that I add a record set to a host zone via aws sdk. When adding a record set, the aws sdk has a GetChange call that can be used to get that status. Here is the code I am currently doing:

  this._adminService.registerDomain(caseWebsiteUrl.Url).
    subscribe(id => {
      return Observable.interval(5000).flatMap(() => {
        return this._adminService.getChange(id);
      }).
        takeWhile((s) => s.ChangeInfo.Status.Value !== 'INSYNC').subscribe(
        () => {

        },
        () => {

        },
        () => this.urlStatus = 'fa fa-check');

    });

In the above code, I want to call registerDomain and after that has been successful, I want to call getChange every 5 seconds until the Status.Value !== 'INSYNC'

A few questions:

  1. What is flatMap doing?
  2. Is it possible to do this without 2 subscribe calls?
  3. If I don't need the next or error callbacks, but I need the complete, is it necessary to declare empty bodies?

Solution

  • Flatmap aka MergeMap will flatten higher order observables. Thus Observable<Observable<T>> => Observable<T>.

    The subscribe inside subscribe is a code smell and can and should be refactored. If you do not need the error/complete handlers you do not need to pass those. For instance:

    function registerDomain(caseWebsiteUrl) {
      return this._adminService.registerDomain(caseWebsiteUrl.Url)
          .concatMap(registerId => Observable.interval(5000)
              .mergeMap(() => this._adminService.getChange(registerId))
              .takeWhile((info) => info.ChangeInfo.Status.Value !== 'INSYNC'))
    }
    registerDomain.subscribe(res => console.log('res:'+res));
    

    This works based on the assumption and limitations that:

    • registerDomain() returns an Observable which completes
    • getChange() will eventually return 'INSYNC'
    • No error handling has been added (for instance a timeout after 30 seconds? Retry if registerDomain() fails?)