Search code examples
rxjsnestjseventemitter

Return EventEmitter as Observable in Nest.js


EventEmitter in Nestjs is wrapper around EventEmitter2 module. I whant that Server-Sent Events return Observable with EE.

import { Controller, Post, Body, Sse } from '@nestjs/common';
import { fromEvent } from 'rxjs';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { OrdersService } from './orders.service';
import { CreateOrderDto } from './dto/create-order.dto';


@Controller('orders')
export class OrdersController {

  constructor(private ordersService: OrdersService,
              private eventEmitter2: EventEmitter2) {}

  @Post()
  createOrder(@Body() createOrderDto: CreateOrderDto) {
    //  save `Order` in Mongo
    const newOrder = this.ordersService.save(createOrderDto);
    //  emit event with new order
    this.eventEmitter2.emit('order.created', newOrder);
    return newOrder;
  }

  @Sse('newOrders')
  listenToTheNewOrders() {
    //  return Observable from EventEmitter2
    return fromEvent(this.eventEmitter2, 'order.created');
  }

}

But after subscribtion to this source from browser i've getting only errors

this.eventSource = new EventSource('http://localhost:3000/api/v1/orders/newOrders');
this.eventSource.addEventListener('open', (o) => {
  console.log("The connection has been established.");
});
this.eventSource.addEventListener('error', (e) => {
  console.log("Some erorro has happened");
  console.log(e);
});
this.eventSource.addEventListener('message', (m) => {
  const newOder = JSON.parse(m.data);
  console.log(newOder);
});

Solution

  • It's quite likely that you forgot to format the event in the right way.

    For SSE to work internally, each chunk needs to be a string of such format: data: <your_message>\n\n - whitespaces do matter here. See MDN reference.

    With Nest.js, you don't need to create such message manually - you just need to return a JSON in the right structure.

    So in your example:

      @Sse('newOrders')
      listenToTheNewOrders() {
        //  return Observable from EventEmitter2
        return fromEvent(this.eventEmitter2, 'order.created');
      }
    

    would have to be adjusted to, for example:

      @Sse('newOrders')
      listenToTheNewOrders() {
        //  return Observable from EventEmitter2
        return fromEvent(this.eventEmitter2, 'order.created')
               .pipe(map((_) => ({ data: { newOrder } })));
      }
    

    the structure { data: { newOrder } } is key here. This will be later translated by Nest.js to earlier mentioned data: ${newOrder}\n\n