How make rxjs WebSocket Subject
auto-reconnection on socket close(complete, error) in Hot Observable (multicast) way?
First of All, I know rxjs
's WebSocket is for browser. but My Project is Node.js Project.
So I replace rxjs
's Websocket with ws
module on npm
global.WebSocket = require("ws");
And My WebSocket Server close connection after 3s from initial connection. (for test rxjs
reconnection)
wss.on("connection", (ws => {
wss.clients.forEach((client) => {
client.send(
JSON.stringify('You are connected')
);
setTimeout(() => {
ws.close();
}, 3000);
});
Lastly, I connect my own WebSocket Server and try reconnect with retry
.
global.WebSocket = require("ws");
const { webSocket } = require("rxjs/webSocket");
const { retry } = require("rxjs/operators");
const wsSubject = webSocket(`ws://localhost://${port}`); // declared globally as Hot Observable
wsSubject.pipe(retry({ delay: 1500 })).subscribe(console.log);
On my code, Initial Connection is established, but There is no re-connection trying.
I think WebSocket Subject
is literally done on connection closed, How can I make WebSocket Reconnection as Hot Observable way.
I want to make other Observables
with multi-casted
emitted values of my wsSubject variable on auto-reconnection environment.
There are two situations on end of WebSocket Connection.
First Situation is end with error
. And The Other one is end with complete
.
First, This is description of rxjs
retry
Operator.
Returns an Observable that mirrors the source Observable with the exception of an
error
.
So, retry
works on error
. If connection is lost cause of error
, retry
works on here.
Second, This is description of rxjs
repeat
Operator.
Repeats all values emitted on the source. It's like
retry
, but for nonerror
cases.
So, If 'WebSocket' Connection closed Normally, It means the condition is not error
but complete
.
If Connection should be trying reconnection on even complete
condition, repeat
would be apt for this.
And Lastly, share
is operator for multi-cast
Then codes should be like below.
global.WebSocket = require("ws");
const { webSocket } = require("rxjs/webSocket");
const { retry, repeat, share } = require("rxjs/operators");
const wsSubject = webSocket(`ws://localhost://${port}`); // declared globally as Hot Observable
wsSubject.pipe(
share(),
retry({ delay: 1500 }),
repeat({ delay: 1500 })
).subscribe(console.log);