Search code examples
angularrxjsangular-http

Server reconnection using RxJS and angular 2 HTTP service


What is the best way to handle a server reconnection using RxJS and Angular 2?

This is what I have so far:

import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/observable/throw';

@Injectable()
export class ConnectionService {

  // Stores the current status of the connection, false = not connected
  serviceOnline: BehaviorSubject<boolean> = new BehaviorSubject(false)

  constructor(
    private http: Http
  ) {
    this.serviceOnline.asObservable().subscribe((online) => {
      console.log("Service online: " + JSON.stringify(online))
    })
    setTimeout( () => this.setServiceOffline(), 2000 );
  }

  setServiceOffline() {
    console.log("Setting service offline");
    this.serviceOnline.next(false);
    this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
  }

  testConnection(): Observable<boolean> {
    console.log("Testing connection");
    return new Observable(observer => {
      this.http.get(`http://endpoint/ping`)
        .timeout(5000)
        .catch((error: any) => Observable.throw(error))
        .subscribe(() => {
          console.log("Connection successful");
          if(this.serviceOnline.getValue() == false)
            this.serviceOnline.next(true);
          observer.next(true);
          observer.complete();
        }, (error) => {
          console.log("Connection failed");
          if(this.serviceOnline.getValue() == true)
            this.serviceOnline.next(false);
          observer.next(false);
          observer.complete();
        });
    })
  }
}

Result in the console:

Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed

If I replace this line..

this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();

with

this.testConnection().delay(2000).repeat().take(5).subscribe();

I do get the repeats:

Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
Connection failed
Connection failed
Connection failed
Connection failed

From the documentation:

public takeUntil(notifier: Observable): Observable

Emits the values emitted by the source Observable until a notifier Observable emits a value.

The serviceOnline BehaviorSubject is only updated if there is a change in the server status. After this line...

this.serviceOnline.next(false);

...serviceOnline only emits another value when the http service has returned a successful response and if the current online status is false (disconnected):

if(this.serviceOnline.getValue() == false)
   this.serviceOnline.next(true);

So takeUntil should work. What am I misunderstanding/missing here?

I am new to RxJS - any other pointers on improving the implementation in this code would also be greatly appreciated...

Here is a Plnkr: http://plnkr.co/edit/WUKxH6nFxc8LMKcxDPql?p=info

RxJS version 5.4. Angular version 4.


Returning serviceOnline as observable doesn't fix the issue: this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();


Solution

  • I think the problem occurs because this.serviceOnline always emits the last value when subscribed to, as it is an instance of BehaviorSubject. When you pass it to the takeUntil operator as a notifier observable, takeUntil subscribes to it and immediately receives a notification which leads to immediate completion of the resulting observable. Note that it doesn't matter whether the notifier observable emits true or false, takeUntil completes whenever it receives any value.

    Try .takeUntil(this.serviceOnline.filter(online => online === true)) instead.