What is the best way to handle a server reconnection using RxJS and Angular 2?
This is what I have so far:
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/observable/throw';
@Injectable()
export class ConnectionService {
// Stores the current status of the connection, false = not connected
serviceOnline: BehaviorSubject<boolean> = new BehaviorSubject(false)
constructor(
private http: Http
) {
this.serviceOnline.asObservable().subscribe((online) => {
console.log("Service online: " + JSON.stringify(online))
})
setTimeout( () => this.setServiceOffline(), 2000 );
}
setServiceOffline() {
console.log("Setting service offline");
this.serviceOnline.next(false);
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
}
testConnection(): Observable<boolean> {
console.log("Testing connection");
return new Observable(observer => {
this.http.get(`http://endpoint/ping`)
.timeout(5000)
.catch((error: any) => Observable.throw(error))
.subscribe(() => {
console.log("Connection successful");
if(this.serviceOnline.getValue() == false)
this.serviceOnline.next(true);
observer.next(true);
observer.complete();
}, (error) => {
console.log("Connection failed");
if(this.serviceOnline.getValue() == true)
this.serviceOnline.next(false);
observer.next(false);
observer.complete();
});
})
}
}
Result in the console:
Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
If I replace this line..
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
with
this.testConnection().delay(2000).repeat().take(5).subscribe();
I do get the repeats:
Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
Connection failed
Connection failed
Connection failed
Connection failed
From the documentation:
public takeUntil(notifier: Observable): Observable
Emits the values emitted by the source Observable until a notifier Observable emits a value.
The serviceOnline BehaviorSubject is only updated if there is a change in the server status. After this line...
this.serviceOnline.next(false);
...serviceOnline only emits another value when the http service has returned a successful response and if the current online status is false (disconnected):
if(this.serviceOnline.getValue() == false)
this.serviceOnline.next(true);
So takeUntil should work. What am I misunderstanding/missing here?
I am new to RxJS - any other pointers on improving the implementation in this code would also be greatly appreciated...
Here is a Plnkr: http://plnkr.co/edit/WUKxH6nFxc8LMKcxDPql?p=info
RxJS version 5.4. Angular version 4.
Returning serviceOnline as observable doesn't fix the issue: this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();
I think the problem occurs because this.serviceOnline
always emits the last value when subscribed to, as it is an instance of BehaviorSubject
. When you pass it to the takeUntil
operator as a notifier observable, takeUntil
subscribes to it and immediately receives a notification which leads to immediate completion of the resulting observable. Note that it doesn't matter whether the notifier observable emits true
or false
, takeUntil
completes whenever it receives any value.
Try .takeUntil(this.serviceOnline.filter(online => online === true))
instead.