Search code examples
nestjsobservableserver-sent-events

Server Sent Event (SSE) with nestJs


I am new to nestJs, Now I need to implement the sse in nestJs, In nestJs they have the special decorator called @Sse to establising the sse connection between the clients and server.

If I use this @Sse decorator, I need to return observables. observables are like a events, Whenever the new event is emitted observar will receive the new emitted data.

notification.controller.ts

import { Public } from 'src/decorators';
import { Observable } from 'rxjs';
import { FastifyReply } from 'fastify';
import { NotificationService } from './notification.service';

import { Sse, Controller, Res } from '@nestjs/common';

@Public()
@Controller()
export class NotificationController {
  constructor(private notificationService: NotificationService) {}
  @Sse('notifications')
  async sendNotification(@Res() reply: FastifyReply): Promise<Observable<any>> {
    return await this.notificationService.handleConnection();
  }
}

notification.service.ts

import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';

@Injectable()
export class NotificationService {
  notificationEvent: Subject<any> = new Subject();
  async handleConnection() {
    setInterval(() => {
      this.notificationEvent.next({ data: { message: 'Hello World' } });
    }, 1000);
    return this.notificationEvent.asObservable();
  }
}

I want to demonstrate this problem with an example, Let's consider use A and B, let's consider User A first connect with the sse connection. So then the setInterval method will be triggered from the service file, So every 1-second user A will receive message { message: 'Hello World' } from the server. Now consider user B connect to sse.

Now what happens is, it also triggers the setInterval method, so observable emits the event, this event is received by both user A and B. I consider it as a problem for me.

What my requirement is, user needs to connect to sse, But I want to send the message from server based on the role and some kind of things, I want to send some messages for specific users, and some messages for other some users, this is what my requirement is. If I want to implement this, What do I need to implement?

Is this possible with observable? or I need to find any other approach. If you know the answer, kindly share your answer whatever it is.


Solution

  • To my knowledge, this isn't really what Server Sent Events are for. However, if it's what you really want to do, you could make a map of clients to their events. Essentially using something like

    @Sse('notifications/:id')
    sendNotification(@Param() { id }: record<string, string>): Observable<any> {
      // no need to make this async or return a Promise. Observables are handled just fine as they are
      return this.notificationService.handleConnection(id);
    }
    

    And have your NotificationService take in the id to maintain the map of values

    import { Injectable } from '@nestjs/common';
    import { Subject } from 'rxjs';
    
    @Injectable()
    export class NotificationService {
      notificationEvents: Record<string, Subject<any>> = {}
      async handleConnection(id: string) {
        if (!this.notificationEvetnts[id]) {
          this.notificationEvents[id] = new Subject();
        }
        setInterval(() => {
          this.notificationEvents[id].next({ data: { message: 'Hello World' } });
        }, 1000);
        return this.notificationEvent[id].asObservable();
      }
    }
    

    This way you'll maintain which notifications are for which users based on the id passed to the SSE endpoint.

    Otherwise to maintain events per user, sockets are probably a better use case