Search code examples
rxjsnestjsws

How to hook up a Long running Process in NestJS with ws ( Websockets ) and RxJs


I want to run code in a function - and then return as a websocket Observable. Effectively monitoring a long running process. I can not figure out how to return the values correctly through the websockets in this format.

My long-running process: ( obviously not going to actually take a long time )

import { Observable } from 'rxjs';

export function longRunningProcess (): Observable<unknown> {

    return new Observable(subscriber => {
        subscriber.next('End of step 1');
        subscriber.next('End of step 2');
        subscriber.next('End of step 3');
        setTimeout(() => {
            subscriber.next('End of Step 4');
            subscriber.complete();
        }, 1000);
    });
}

My NestJS endpoint that returns to the ws ( Websocket )

import { WsAdapter } from '@nestjs/platform-ws';
import {
    MessageBody,
    SubscribeMessage,
    WebSocketGateway,
    WebSocketServer,
    WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'

@WebSocketGateway()
export class EventsGateway {
    @WebSocketServer()
    server: Server;

    @SubscribeMessage('events')
    // send {"event":"events","data":"test"} in websockets
    findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
        return from(longRunningProcess) // Not really sure how to return this 
        //return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item })));  //<< this works from the sample
    }

    @SubscribeMessage('identity')
    async identity (@MessageBody() data: number): Promise<number> {
        return data;
    }
}


Solution

  • just map your result from the longRunningProcess like you've did for the numbers array.

    @SubscribeMessage('events')
    findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
        return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
    }