Search code examples
reactjstypescriptspring-bootserver-sent-events

The front end not receiving server sent events


I'm trying to subscribe to events on BE trough a EventSource on the FronteEnd of my application. The problem seems to be reflect that eventlistener, because I'm reaching the BE endpoint and registering the subscriber.

I'm using React Typescript on FE, Java 17 with Spring Boot on BE and a RedisCache to save and recover the subscribed users so if a pod is down or we need to deploy a feature, we can recover the subscriber.

this is my FE function to subscribe to BE events, called on the first useEffect of the Component

/**
 * This function registers and receives BE events to update data
 * similar to a webSocket, but only unidirectional from BE to FE
 * For now, just UNANSWERED_BUCKET is used, but in the future
 * can be used for other event types
 * @returns
 */
function registerUnansweredSSeEvent() {
  const sourceUnanswered = SseEventService.subscribeToEventsEventSource(
    context?.myExternalId,
    SseSubscriptionEvents.UNANSWERED_BUCKET,
  );
  sourceUnanswered.addEventListener(
    SseSubscriptionEvents.UNANSWERED_BUCKET,
    (event) => {
      fillUnansweredAfterCategorization(event);
    },
  );
  sourceUnanswered.onopen = (event: any) => logger.info("Connection opened");
  sourceUnanswered.onerror = (event: any) => logger.info("Connection error");
  return sourceUnanswered;
}

this is the function susbscribeToEventsEventSurce:

subscribeToEventsEventSource(
  externalId: string | undefined,
  eventToSubscribe: string,
) {
  return new EventSource(
    `${this.baseUrl}/subscribe?personalExternalId=${externalId}&eventToSubscribe=${eventToSubscribe}`,
  );
}

My Controller:

@GetMapping(value = "/subscribe", produces = "text/event-stream")
    public SseEmitter subscribeSseEmitter(
            @RequestParam("personalExternalId") String personalExternalId,
            @RequestParam("eventToSubscribe") String eventToSubscribe){
        return emitterService.registerClient(personalExternalId, eventToSubscribe);
    }

My complete Service

@Service
public class SSEService {
    private final NewRelicLogger logger = NewRelicLogger.getLogger(SSEService.class);
    private static final AtomicInteger ID_COUNTER = new AtomicInteger(1);
    //2hs maximum time Timeout. Should we increase this?
    public static final long DEFAULT_TIMEOUT = 7200000L;
    private final SlackNotificationService slackNotificationService;
    private final CacheService cacheService;
    public SSEService(SlackNotificationService slackNotificationService, CacheService cacheService) {
        this.slackNotificationService = slackNotificationService;
        this.cacheService = cacheService;
    }
    
    /**
     * @param subscriberExternalId: Email of the registered user to subscribe to notifications
     * @return This method subscribes the user to the events sent from the BE
     */
    @AutomaticLogging
    public SseEmitter registerClient(String subscriberExternalId, String eventToSubscribe) {
        var emitter = new SseEmitter(DEFAULT_TIMEOUT);
        var sseClient = new SseClient(subscriberExternalId != null ? subscriberExternalId : UUID.randomUUID().toString(), emitter);
        addOrUpdateSseClient(sseClient, SseEventType.valueOf(eventToSubscribe));
        emitter.onCompletion(() -> removeFromCache(subscriberExternalId));
        emitter.onError(err -> removeAndLogError(sseClient, err.getMessage()));
        emitter.onTimeout(() -> removeAndLogError(sseClient, "TIMEOUT"));
        
        logger.info("New client registered {}", sseClient.getExternalId());
        slackNotificationService.logInfo("New client registered " + sseClient.getExternalId());
        return emitter;
    }
    
    private void removeFromCache(String subscriberExternalId){
        try {
            cacheService.removeCacheKey(SSE_SUBSCRIBER, subscriberExternalId);
        } catch (Exception e) {
            logger.warn("Couldn't remove cache for subscriber: " + subscriberExternalId);
        }
    }
   
    @AutomaticLogging
    public void unregisterClient(String subscriberExternalId) {
        removeFromCache(subscriberExternalId);
    }
    
    /**
     * @param newClient Here we add a new client removing the previous one
     *                  to not keep open connections if the user reloads the page
     */
    public void addOrUpdateSseClient(SseClient newClient, SseEventType newEventType) {
        String key = newClient.getExternalId();
        SseClient registeredClientsObject = cacheService.getSSEClientFromCache(SSE_SUBSCRIBER, key);
        newClient.getSubscribedEvents().add(newEventType);
        if (registeredClientsObject == null ){
            cacheService.putValueInCache(SSE_SUBSCRIBER, key, newClient);
        } else {
            if (!registeredClientsObject.getSubscribedEvents().contains(newEventType)){
                List<SseEventType> events = registeredClientsObject.getSubscribedEvents();
                events.add(newEventType);
                newClient.setSubscribedEvents(events);
                cacheService.putValueInCache(SSE_SUBSCRIBER, key, newClient);
            }
        }
    }
    
    
    /**
     * @param eventType The type of the event sent to the subscribers
     *                  So only subscribed to this eventType receives the notifications
     *                  Method used to Broadcast the messages to all subscribers
     *          Use this for generic updates
     */
    public void broadcastSseEmitterMessages(SseEventType eventType) {
        List<SseClient> clients = cacheService.getAllSSEClientFromCache()
                .stream()
                .filter(client -> client.getSubscribedEvents().contains(eventType))
                .toList();
        
        for (SseClient client : clients) {
            sendBroadcasterEmitterMessage(client, eventType);
        }
    }
    
    /**
     * @param client Subscriber who will receive the event
     * @param event event to notify
     */
    @AutomaticLogging
    private void sendBroadcasterEmitterMessage(SseClient client, SseEventType event) {
        var sseEmitter = client.getSseEmitter();
        try {
            logger.info("Notify client {}", client.getExternalId());
            var eventId = ID_COUNTER.incrementAndGet();
            SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(event.name())
                    .id(String.valueOf(eventId))
                    .data(event, MediaType.APPLICATION_JSON);
            sseEmitter.send(eventBuilder);
        } catch (IOException e) {
            sseEmitter.completeWithError(e);
        }
    }
    
    private void removeAndLogError(SseClient client, String error) {
        logger.error("Error during communication. Unregister client {}, error {}", client.getExternalId(), error);
        slackNotificationService.logError("Error during communication. Unregister client " + client.getExternalId() + " with error: " + error);
        removeFromCache(client.getExternalId());
    }
    
    public void broadcastSseEmitterMessagesChatBuckets(List<SmallChannelDTO> channels, SseEventType eventType) {
        List<SseClient> clients = cacheService.getAllSSEClientFromCache();
        for (SseClient client: clients) {
            sendBroadcastSseEmitterMessagesChatBuckets(client, channels, eventType);
        }
    }
    
    /**
     * @param client Subscriber who will receive the event for unanswered bucket on chat
     * @param channels the channels to update
     * @param eventType event to notify
     */
    @AutomaticLogging
    private void sendBroadcastSseEmitterMessagesChatBuckets(SseClient client, List<SmallChannelDTO> channels, SseEventType eventType) {
        var sseEmitter = client.getSseEmitter();
        try {
            logger.info("Notify client {} - sseEmitter {} -  the event {}", client.getExternalId(), sseEmitter, eventType.name());
            var eventId = ID_COUNTER.incrementAndGet();
            SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(eventType.name())
                    .id(String.valueOf(eventId))
                    .data(channels, MediaType.APPLICATION_JSON);
            sseEmitter.send(eventBuilder);
        } catch (IOException e) {
            sseEmitter.completeWithError(e);
        }
    }
    
    
}

and the code that saves and return the cached info:

public SseClient getSSEClientFromCache(String cacheName, String key) {
        SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(cacheName);
        if (cache != null) {
            Cache.ValueWrapper valueWrapper = cache.get(key);
            if (valueWrapper != null) {
                return (SseClient) valueWrapper.get();
            }
        }
        return null;
    }
    
    public List<SseClient> getAllSSEClientFromCache() {
        SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(SSE_SUBSCRIBER);
        if (cache != null) {
            List<String> allKeys = cache.getAllKeys().stream()
                    .filter(c -> c.startsWith(SSE_SUBSCRIBER))
                    .map(c -> c.substring(SSE_SUBSCRIBER.length() + 1)) //we cut the key so only get the externalID
                    .toList();
            List<SseClient> clients = new ArrayList<>();
            for (String key : allKeys) {
                SseClient client = this.getSSEClientFromCache(SSE_SUBSCRIBER, key);
                if (client != null) {
                    clients.add(client);
                }
            }
            return clients;
        }
        return Collections.emptyList();
    }
    
    public void putValueInCache(String cacheName, Object key, Object value) {
        SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(cacheName);
        if (cache != null) {
            cache.put(key, value);
        }
    }

When I need to fire the event, it works, but the FE doesn't receive it.

Any help?


Solution

  • I find a solution and managed to make it work!

    I've changed my controller to look like this and it worked, but not fully until I added this

    cacheControl(CacheControl.noCache())

    @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
         public ResponseEntity<SseEmitter> subscribeSseEmitter(
                 @RequestParam("personalExternalId") String personalExternalId,
                 @RequestParam("eventToSubscribe") String eventToSubscribe) {
             return ResponseEntity
                     .ok()
                     .header("Connection", "keep-alive")
                     .cacheControl(CacheControl.noCache())
                     .contentType(MediaType.TEXT_EVENT_STREAM)
                     .body(emitterService.registerClientToSet(personalExternalId, eventToSubscribe));
         }
    

    But now I'm facing another issue What happens on timeout if I want to reconnect the client? Seems like the Fe keeps trying to reconnect and the Server says shows a Timeout. I thought would be a good practice to set a default timeout instead of just putting Long.maxValue or something like that.