I have a web socket connection established between a browser client and a Spring Boot backend using STOMP and sock js. Every second, a payload is sent to the server from the client containing data that needs to be persisted to a Postgres database. There may be thousands of clients connected simultaneously so I don't want to update the database every second for each one. So, to reduce the CPU load, I want to listen for when a web socket StompCommand.DISCONNECT
event occurs and then persist the last received message from the client.
Is this possible, or is there another way to get around thi sproblem?
In this case - the question is really opinionated - there are many possible implementations.
One of the implementations can do the following:
When you receive the message from the connected client - maintain a map (in memory will be enough, for the sake of idea) of the the identifier of the current client to the Last Data.
Every time you get a new message in @MessageMapping
annotated class - update an entry in the map, so that it will always contain the last message.
The value of the map will be the last message, the key can be Principal, SessionId string - whatever you'll find useful.
@Component
public class LastMessageHolder {
private Map<Principal, MyData> lastDataPerPrincipal;
public void updateLastData(Principal principal, MyData data) {
lastDataPerPrincipal.put(principal, data);
}
public MyData getLastDataForPrincipalAndClear(Principal principal) {
return lastDataPerPrincipal.remove(principal);
}
}
The message Receiver will get the messages through the stomp channel and update the last message holder
@Component
public class MyMessageReceiver {
@Autowired
private LastMessageHolder lastMessageHolder;
@MessageMapping(...)
public void onDataReceived(Principal principal, MyData data) {
// this gets called every second per client
lastMessageHolder.updateLastData(principal, data);
}
}
And when you listen for the disconnect message in the channel interceptor - make remove the data from the principal that is being disconnected and store it in the database:
@Component
public class DbStoreChannelInterceptor implements ChannelInterceptor {
@Autowired
private LastMessageHolder lastMessageHolder;
@Autowired // something that will store your stuff in the db
private DbDao dbDao;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message,
StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// populate a principal here, from headers, authentication token,
whatever
Principal principal = ...
accessor.setUser(principal);
}
if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
Principal principal = accessor.getUser();
MyData data = lastMessageHolder.getDataForPrincipalAndClear(principal);
dbDao.storeDataInDbForPrincipal(principal, data);
}
}
}
This is a basic idea.
From that you can take it further, and instead of storing the data from the channel interceptor (in this case the actual INSERT
will be done for each client) you might want to throw it into some in-memory or distributed queue - whatever suits you best, so that the consumer will read the batch of the data objects and store them all at once, so that it will lead to much less load on your RDBMS.
In addition, I'll just mention, that you should think about the situation where the client keeps sending the data, but the server gets down for some reason, while the client is still interested to keep sending data. This is more in the area of the architecture of the distributed system, so its way beyond the scope of the question.