I want to run code in a function - and then return as a websocket Observable. Effectively monitoring a long running process. I can not figure out how to return the values correctly through the websockets in this format.
My long-running process: ( obviously not going to actually take a long time )
import { Observable } from 'rxjs';
export function longRunningProcess (): Observable<unknown> {
return new Observable(subscriber => {
subscriber.next('End of step 1');
subscriber.next('End of step 2');
subscriber.next('End of step 3');
setTimeout(() => {
subscriber.next('End of Step 4');
subscriber.complete();
}, 1000);
});
}
My NestJS endpoint that returns to the ws ( Websocket )
import { WsAdapter } from '@nestjs/platform-ws';
import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'
@WebSocketGateway()
export class EventsGateway {
@WebSocketServer()
server: Server;
@SubscribeMessage('events')
// send {"event":"events","data":"test"} in websockets
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return from(longRunningProcess) // Not really sure how to return this
//return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item }))); //<< this works from the sample
}
@SubscribeMessage('identity')
async identity (@MessageBody() data: number): Promise<number> {
return data;
}
}
just map
your result from the longRunningProcess
like you've did for the numbers array.
@SubscribeMessage('events')
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
}