I am implementing a WebSockets connection in my platform. I'm using Angular 10 in the frontend and StompJS 2.3.3 over SockJS Client 1.5.0 for establishing the connection to the backend.
I created a WebSockets service to manage the connection, and it's connecting, and working properly.
This is the service I created:
export class WebSocketsService implements OnDestroy {
/**
* The authentication service to get the token from.
*/
private authenticationService: AuthenticationService;
private serverUrl = environment['ceNotificationsServer'];
private socket;
private state: BehaviorSubject<SocketClientState>;
public constructor(authenticationService: AuthenticationService) {
this.authenticationService = authenticationService;
const client_id = environment['ceNotificationsClientId'];
const token = this.authenticationService.getToken();
const url = this.serverUrl + '/websocket' + '?clientId=' + client_id + '&Authorization=' + token;
const ws = new SockJS(url);
this.socket = StompJS.over(ws);
this.socket.reconnect_delay = 5000;
this.socket.debug = () => {
};
this.state = new BehaviorSubject<SocketClientState>(SocketClientState.ATTEMPTING);
this.socket.connect({}, () => {
this.state.next(SocketClientState.CONNECTED);
console.log('Connection to the socket is open.');
});
}
/**
* Establish the connection to the websocket.
*/
connect(cfg: { reconnect: boolean } = {reconnect: false}): Observable<any> {
return new Observable(observer => {
this.state.pipe(
cfg.reconnect ? this.reconnect : o => o,
filter(state => state === SocketClientState.CONNECTED))
.subscribe(() => {
observer.next(this.socket);
});
});
}
message(queue: String): Observable<any> {
return this.connect({reconnect: true})
.pipe(switchMap(client => {
return new Observable<any>(observer => {
client.subscribe(queue, message => {
observer.next(JSON.parse(message.body));
});
});
}),
retryWhen((errors) => errors.pipe(delay(5))));
}
reconnect(observable: Observable<any>): Observable<any> {
return observable.pipe(
retryWhen(errors => errors.pipe(
tap(val => console.log('Trying to reconnect to the socket', val)),
delayWhen(_ => timer(5000))
))
);
}
/**
* Close the connection to the websocket.
*/
close() {
if (this.socket) {
this.socket.complete();
this.state.next(SocketClientState.CLOSED);
console.log('Connection to the socket is closed');
this.socket = null;
}
}
ngOnDestroy() {
this.close();
}
}
export enum SocketClientState {
ATTEMPTING, CONNECTED, CLOSED
}
And in my Angular component, I added this code to subscribe to the WebSockets queue and obtain some notifications to populate my notifications tray:
const subscription1 = this.websocketsService.message('/messages')
.subscribe(outdatedProfiles => {
const notification: Notification = outdatedProfiles;
notification.message = 'notificationNotSyncedProviders';
this.notifications.addNotification(notification);
});
this.subscriptionsManager.add(subscription1);
My issue is that when I lose the connection (if the wifi is disconnected) it is not reconnecting again. It catches the errors, but not the connection closed event.
I tried this approach:
public constructor(authenticationService: AuthenticationService) {
this.socket.connect({}, () => {...});
this.socket.onclose = function(event) {
console.log("WebSocket is closed now.");
this.connect({reconnect: true});
};
}
But it's not working. I've read the documentation and I cannot seem to have the answer to the reconnection issue when the connection is closed. Any ideas?
As @BizzyBob suggested I think a better approach would be use @stomp/ng2-stompjs that it depends of rx-stomp and stompjs newer version and doing more easy the task.
Install @stomp/ng2-stompjs
npm i @stomp/ng2-stompjs
Create a configuration file for rx-stom service (my-rxstomp.config.ts)
export const myRxStompConfig: InjectableRxStompConfig = {
// Which server?
brokerURL: 'ws://127.0.0.1:8080/',
// Wait in milliseconds before attempting auto reconnect
// Set to 0 to disable
// Typical value 500 (500 milli seconds)
reconnectDelay: 200
};
Add the configuration to RxStomp service in section providers of app.module.ts
{
provide: InjectableRxStompConfig,
useValue: myRxStompConfig,
},
{
provide: RxStompService,
useFactory: rxStompServiceFactory,
deps: [InjectableRxStompConfig],
},
];
Inject in your component RxStomp service and subscribe to change of connection state for doing something when connection is closed. If you only want reconnect when connection is closed you no need subscribe to connection state, the configuration field reconnectDelay: 200 will make the reconnection for you.
constructor(private rxStompService: RxStompService) { }
ngOnInit() {
this.rxStompService.connectionState$.subscribe(next => {
console.log('Connection State', RxStompState[next]);
if(next === RxStompState.CLOSED) {
// Do something
}
});
}