Search code examples
rxjsnestjsserver-sent-events

Nestjs @Sse : return result of a promise in rxjs observable


I am trying to go a simple step beyond the nest doc example in implementing @Sse() in a controller but I never used rxjs untill now so Im a bit confused.

The flow is :

  1. client send a POST request with a file payload
  2. server (hopefully) sends back the newly created project with a prop status:UPLOADED
  3. client subscribe to sse route described below passing as param the projectId it just received from server
  4. in the meantime server is doingSomeStuff that could take from 10sec to a min. When doingSomeStuff is done, project status is updated in db from UPLOADED to PARSED

My need is for the @Sse decorated function to execute at x interval of time a "status-check" and return project.status (that may or may not have been updated at the time)

My present code :

  @Sse('sse/:projectId')
  sse(@Param('projectId') projectId: string): Observable<any> {
    const projId$ = from(this.projectService.find(projectId)).pipe(
      map((p) => ({
        data: {
          status: p.status,
        },
      })),
    );
    return interval(1000).pipe(switchMap(() => projId$));
  }

I don't put code of the service here as it a simple mongooseModel.findById wrapper.

My problem is the status returned remains UPLOADED and is never updated. It doesnt seem the promise is reexecuted at every tick. If I console.log inside my service I can see my log being printed only once with the initial project value while I expect to see a new log at each tick.


Solution

  • This is a two-step process.

    1. We create an observable out of the promise generated by this.service.findById() using the from operator in rxjs. We also use the map operator to set the format of the object we need when someone subscribes to this observable.

    2. We want to return this observable every x seconds. interval(x) creates an observable that emits a value after every x milliseconds. Hence, we use this and then switchMap to the projId$ whenever the interval emits a value. The switchMap operator switches to the inner observable whenever the outer observable emits a value.

    Please note: Since your server may take 10 sec, to min for doing the operation, you should set the intervalValue accordingly. In the code snippet below, I've set it to 10,000 milli seconds which is 10 seconds.

    const intervalValue = 10000;
    
    @Sse('sse/:projectId')
    sse(@Param('projectId') projectId: string): Observable < any > {
      return interval(intervalValue).pipe(
        switchMap(() => this.projectService.find(projectId)),
        map((p) => ({
          data: {
            status: p.status,
          }
        })));
    }
    
    
    // OR
    
    @Sse('sse/:projectId')
    sse(@Param('projectId') projectId: string): Observable < any > {
      const projId$ = defer(() => this.service.findById(projectId)).pipe(
        map(() => ({
          data: {
            _: projectId
          }
        }))
      );
    
      return interval(intervalValue).pipe(switchMap(() => projId$));
    }