I am trying to go a simple step beyond the nest doc example in implementing @Sse() in a controller but I never used rxjs untill now so Im a bit confused.
The flow is :
POST
request with a file payloadproject
with a prop status:UPLOADED
projectId
it just received from serverdoingSomeStuff
that could take from 10sec to a min. When doingSomeStuff
is done, project status is updated in db from UPLOADED
to PARSED
My need is for the @Sse decorated function to execute at x interval of time a "status-check" and return project.status
(that may or may not have been updated at the time)
My present code :
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable<any> {
const projId$ = from(this.projectService.find(projectId)).pipe(
map((p) => ({
data: {
status: p.status,
},
})),
);
return interval(1000).pipe(switchMap(() => projId$));
}
I don't put code of the service here as it a simple mongooseModel.findById
wrapper.
My problem is the status returned remains UPLOADED
and is never updated. It doesnt seem the promise is reexecuted at every tick. If I console.log inside my service I can see my log being printed only once with the initial project
value while I expect to see a new log at each tick.
This is a two-step process.
We create an observable out of the promise generated by this.service.findById()
using the from
operator in rxjs. We also use the map
operator to set the format of the object we need when someone subscribes to this observable.
We want to return this observable every x seconds. interval(x)
creates an observable that emits a value after every x
milliseconds. Hence, we use this and then switchMap
to the projId$
whenever the interval emits a value. The switchMap
operator switches to the inner observable whenever the outer observable emits a value.
Please note: Since your server may take 10 sec, to min for doing the operation, you should set the intervalValue
accordingly. In the code snippet below, I've set it to 10,000 milli seconds which is 10 seconds.
const intervalValue = 10000;
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
return interval(intervalValue).pipe(
switchMap(() => this.projectService.find(projectId)),
map((p) => ({
data: {
status: p.status,
}
})));
}
// OR
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
const projId$ = defer(() => this.service.findById(projectId)).pipe(
map(() => ({
data: {
_: projectId
}
}))
);
return interval(intervalValue).pipe(switchMap(() => projId$));
}