Search code examples
javascriptlong-pollingrxjs

How can I turn an observable into an observable of long polling observables which complete on a specific value?


I'm creating an interactive webpage with RxJs.

This is what I want to achieve:

I have an application that generates tokens. These tokens can be consumed by an external entity.

When a user creates a token, the page starts polling the webserver for its status (consumed or not). When the token is consumed, the page refreshes.

So, when the token is created, a request is sent to the server every 2 seconds asking whether the token is consumed yet.

I have an Observable of strings that represent my generatedTokens.

I actually already have a working implementation using the Rx.Scheduler.default class, which allows me to do things manually. However, I can't help but feel that there should be a much simpler, more elegant solution to this.

This is the current code:

class TokenStore {
  constructor(tokenService, scheduler) {
    // actual implementation omitted for clarity
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]);

    this.consumedTokens = this.generatedTokens
      .flatMap(token => 
        Rx.Observable.create(function(observer) {
          var notify = function() {
            observer.onNext(token);
            observer.onCompleted();
          };
          var poll = function() {
            scheduler.scheduleWithRelative(2000, function() {
                // tokenService.isTokenConsumed returns a promise that resolves with a boolean
                tokenService.isTokenConsumed(token)
                  .then(isConsumed => isConsumed ? notify() : poll());
              }
            );
          };
          poll();
        }));
  }
}

Is there something like a "repeatUntil" method? I'm looking for an implementation that does the same thing as the code above, but looks more like this:

class TokenStore {
  constructor(tokenService, scheduler) {
    // actual implementation omitted for clarity
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]);

    this.consumedTokens = this.generatedTokens
      .flatMap(token =>
        Rx.Observable.fromPromise(tokenService.isTokenConsumed(token))
                     .delay(2000, scheduler)
                      // is this possible?
                     .repeatUntil(isConsumed => isConsumed === true));
  }
} 

Solution

  • Funnily enough the answer struck me a few minutes after posting the question. I suppose rubberducking might not be so silly after all.

    Anyway, the answer consisted of two parts:

    • repeatUntil can be achieved with a combination of repeat(), filter() and first()

    • fromPromise has some internal lazy cache mechanism which causes subsequent subscriptions to NOT fire a new AJAX request. Therefore I had to resort back to using Rx.Observable.create

    The solution:

    class TokenStore {
      constructor(tokenService, scheduler) {
        // actual implementation omitted for clarity
        this.generatedTokens = Rx.Observable.just(["token1", "token2"]);
    
        this.consumedTokens = this.generatedTokens
          .flatMap(token =>
            // must use defer otherwise it doesnt retrigger call upon subscription
            Rx.Observable
            .defer(() => tokenService.isTokenConsumed(token))
            .delay(2000, scheduler)
            .repeat()
            .filter(isConsumed => isConsumed === true)
            .first())
        .share();
      }
    } 
    

    A minor sidenote: the "share()" ensures that both observables are hot, which avoids the scenario where every subscriber would cause ajax request to start firing.