We have a Java application that sends messages through a stomp connection over websocket using SpringBoot messaging support. The data should be sent to specific users once they connect and subscribe to the topic but when we reload the page the websocket breaks and never sends any messages again.
We listen for the SessionSubscribeEvent here (so we can send an initial message after subscription is made):
@Component
@AllArgsConstructor
public class TransactionSubscriptionListener implements ApplicationListener<SessionSubscribeEvent> {
private static final String DESTINATION_HEADER = "simpDestination";
private final RegionTransactionSender regionTransactionSender;
@Override
public void onApplicationEvent(SessionSubscribeEvent subscribeEvent) {
Object simpDestination = subscribeEvent.getMessage().getHeaders().get(DESTINATION_HEADER);
if (simpDestination == null) {
return;
}
String destination = String.valueOf(simpDestination);
if (destination.matches(RegionTransactionSender.REGEXP)) {
regionTransactionSender.send();
}
}
}
Region transaction sender implementation:
@Component
@AllArgsConstructor
public class RegionTransactionSender {
public static final String REGEXP =
ApiVersionConstants.TRANSACTIONS_FOR_REGION_DESTINATION_WITH_SUBSCRIBER + "/\\S*";
private static final String TOPIC_URL_PREFIX = ApiVersionConstants.TRANSACTIONS_FOR_REGION_DESTINATION + "/";
private final SimpMessageSendingOperations sendingOperations;
private final TransactionService transactionService;
private final SimpUserRegistry simpUserRegistry;
public void send() {
Set<SimpUser> users = simpUserRegistry.getUsers();
users.stream()
.filter(SimpUser::hasSessions)
.forEach(this::sendToSubscriptions);
}
private void sendToSubscriptions(SimpUser user) {
user.getSessions().forEach(session -> session.getSubscriptions()
.forEach(subscription -> sendToTopics(user, subscription)));
}
private void sendToTopics(final SimpUser user, final SimpSubscription subscription) {
String destination = subscription.getDestination();
if (destination.matches(REGEXP)) {
Optional<String> regionOptional = WebsocketUtils.retrieveOrganizationRegionFromDestination(destination);
regionOptional.ifPresent(region -> sendForRegionTopic(user, region));
}
}
private void sendForRegionTopic(final SimpUser user, final String region) {
Set<TransactionResponse> transactionsForRegion = transactionService
.getTransactionsForRegion(AbstractWebsocketSender.TRANSACTIONS_COUNT, region);
sendingOperations.convertAndSendToUser(user.getName(), TOPIC_URL_PREFIX + region, transactionsForRegion);
}
}
The send() method is called later on but no messages are sent.
Messages visible in Chrome's network debugging tool
As you can see our other websocket (systemBalanceSummary) works great. The difference is that on systemBalanceSummary we sent messages to a non user-specific destination. It's also worth mentioning that when we access the website for the first time everything works fine.
Why does that websocket break when we reload the page?
EDIT
After some debugging we've found out that even though the subscription event is fired there are no users in SimpUserRegistry but we do not know what causes that.
I have found solution for this.
First, you need to implement SimpUserRegistry
instead of using DefaultSimpUserRegistry
. The reason for that is that DefaultSimpUserRegistry seem to add user after SessionConnctedEvent
is triggered and it is not always connected. I changed that so user is added after SessionConnectEvent
.
This resolves problem of not having users in user registry after reload though. If this is not a problem, you can probably skip it.
After that I changed usage of convertAndSendToUser
method. In code provided in question data is being sent to username. I changed that so I am sending data to sessionId, but also added some headers. Here is the code for that:
private void sendForRegionTopic(final String region, final String sessionId) {
Set<TransactionResponse> transactionsForRegion = transactionService
.getTransactionsForRegion(AbstractWebsocketSender.TRANSACTIONS_COUNT, region);
sendingOperations.convertAndSendToUser(sessionId,
TOPIC_URL_PREFIX + region,
transactionsForRegion,
createHeaders(sessionId));
}
private MessageHeaders createHeaders(final String sessionId) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setSessionId(sessionId);
accessor.setLeaveMutable(true);
return accessor.getMessageHeaders();
}