I'm creating an interactive webpage with RxJs.
This is what I want to achieve:
I have an application that generates tokens. These tokens can be consumed by an external entity.
When a user creates a token, the page starts polling the webserver for its status (consumed or not). When the token is consumed, the page refreshes.
So, when the token is created, a request is sent to the server every 2 seconds asking whether the token is consumed yet.
I have an Observable
of strings that represent my generatedTokens
.
I actually already have a working implementation using the Rx.Scheduler.default class, which allows me to do things manually. However, I can't help but feel that there should be a much simpler, more elegant solution to this.
This is the current code:
class TokenStore {
constructor(tokenService, scheduler) {
// actual implementation omitted for clarity
this.generatedTokens = Rx.Observable.just(["token1", "token2"]);
this.consumedTokens = this.generatedTokens
.flatMap(token =>
Rx.Observable.create(function(observer) {
var notify = function() {
observer.onNext(token);
observer.onCompleted();
};
var poll = function() {
scheduler.scheduleWithRelative(2000, function() {
// tokenService.isTokenConsumed returns a promise that resolves with a boolean
tokenService.isTokenConsumed(token)
.then(isConsumed => isConsumed ? notify() : poll());
}
);
};
poll();
}));
}
}
Is there something like a "repeatUntil" method? I'm looking for an implementation that does the same thing as the code above, but looks more like this:
class TokenStore {
constructor(tokenService, scheduler) {
// actual implementation omitted for clarity
this.generatedTokens = Rx.Observable.just(["token1", "token2"]);
this.consumedTokens = this.generatedTokens
.flatMap(token =>
Rx.Observable.fromPromise(tokenService.isTokenConsumed(token))
.delay(2000, scheduler)
// is this possible?
.repeatUntil(isConsumed => isConsumed === true));
}
}
Funnily enough the answer struck me a few minutes after posting the question. I suppose rubberducking might not be so silly after all.
Anyway, the answer consisted of two parts:
repeatUntil can be achieved with a combination of repeat()
, filter()
and first()
fromPromise
has some internal lazy cache mechanism which causes subsequent subscriptions to NOT fire a new AJAX request. Therefore I had to resort back to using Rx.Observable.create
The solution:
class TokenStore {
constructor(tokenService, scheduler) {
// actual implementation omitted for clarity
this.generatedTokens = Rx.Observable.just(["token1", "token2"]);
this.consumedTokens = this.generatedTokens
.flatMap(token =>
// must use defer otherwise it doesnt retrigger call upon subscription
Rx.Observable
.defer(() => tokenService.isTokenConsumed(token))
.delay(2000, scheduler)
.repeat()
.filter(isConsumed => isConsumed === true)
.first())
.share();
}
}
A minor sidenote: the "share()" ensures that both observables are hot, which avoids the scenario where every subscriber would cause ajax request to start firing.