Search code examples
angulartypescriptwebsocketrxjsngrx

Reconnecting a websocket in Angular and rxjs?


I have a ngrx/store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I start the application I receive incoming data flawlessly.

However after a while (updates are coming in quite infrequently) the connection seem to get lost and I don't get anymore incoming data. My code:

The service

import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => {
      console.log("WebService Connected to " + this.destination);
    }

    return Observable.create(observer => {
      this.websocket.onmessage = (evt) => {
        observer.next(evt);
      };
    })
      .map(res => res.data)
      .share();
  }
}

The subscriber

  export class AppComponent implements OnInit {

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}

  ngOnInit() {
    this.memberService.listenToTheSocket().subscribe((result: any) => {
      try {
        console.log(result);
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
      } catch (ex) {
        console.log(JSON.stringify(ex));
      }
    })
  }
}

What do I need to do to reconnect the web socket when it times out, so the observable continues to emit incoming values?

I had a look at some Q&A here, here and here and it didn't seem to address this question (in a way I could comprehend).

Note: the websocket at wss://notessensei.mybluemix.net/ws/time is live and emits a time stamp once a minute (in case one wants to test that).

Advice is greatly appreciated!


Solution

  • Actually there now is a WebsocketSubject in rxjs!

     import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket
    
     let subject = webSocket('ws://localhost:8081');
     subject.subscribe(
        (msg) => console.log('message received: ' + msg),
        (err) => console.log(err),
        () => console.log('complete')
      );
     subject.next(JSON.stringify({ op: 'hello' }));
    

    It does handle reconnection when you resubscribe to a broken connection. So for example write this to reconnect:

    subject.retry().subscribe(...)
    

    See the docs for more info. Unfortunately the searchbox doesn't show the method, but you find it here:

    http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

    that #-navigation is not working in my browser, so search for "webSocket" on that page.

    Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15