Search code examples
node.jswebsocketrxjsobservable

rxjs WebSocket auto-reconnection on Node.js


How make rxjs WebSocket Subject auto-reconnection on socket close(complete, error) in Hot Observable (multicast) way?

First of All, I know rxjs's WebSocket is for browser. but My Project is Node.js Project.

So I replace rxjs's Websocket with ws module on npm

global.WebSocket = require("ws");

And My WebSocket Server close connection after 3s from initial connection. (for test rxjs reconnection)

wss.on("connection", (ws => {
  wss.clients.forEach((client) => {
    client.send(
      JSON.stringify('You are connected')
    );
    setTimeout(() => {
      ws.close();
    }, 3000);
  });

Lastly, I connect my own WebSocket Server and try reconnect with retry.

global.WebSocket = require("ws");
const { webSocket } = require("rxjs/webSocket");
const { retry } = require("rxjs/operators");
const wsSubject = webSocket(`ws://localhost://${port}`); // declared globally as Hot Observable
wsSubject.pipe(retry({ delay: 1500 })).subscribe(console.log);

On my code, Initial Connection is established, but There is no re-connection trying.

I think WebSocket Subject is literally done on connection closed, How can I make WebSocket Reconnection as Hot Observable way.

I want to make other Observables with multi-casted emitted values of my wsSubject variable on auto-reconnection environment.


Solution

  • There are two situations on end of WebSocket Connection.

    First Situation is end with error. And The Other one is end with complete.

    First, This is description of rxjs retry Operator.

    Returns an Observable that mirrors the source Observable with the exception of an error.

    So, retry works on error. If connection is lost cause of error, retry works on here.

    Second, This is description of rxjs repeat Operator.

    Repeats all values emitted on the source. It's like retry, but for non error cases.

    So, If 'WebSocket' Connection closed Normally, It means the condition is not error but complete.

    If Connection should be trying reconnection on even complete condition, repeat would be apt for this.

    And Lastly, share is operator for multi-cast

    Then codes should be like below.

    global.WebSocket = require("ws");
    const { webSocket } = require("rxjs/webSocket");
    const { retry, repeat, share } = require("rxjs/operators");
    const wsSubject = webSocket(`ws://localhost://${port}`); // declared globally as Hot Observable
    wsSubject.pipe(
     share(),
     retry({ delay: 1500 }),
     repeat({ delay: 1500 })
    ).subscribe(console.log);