Search code examples
typescriptnestjsprotocol-buffersgrpcgrpc-web

How can I implement a gRPC ServerWritableStream in nest.js?


The nest.js documentation doesn't mention anything regarding the case of a unidirectional ServerWritableStream. I want to receive a normal request and use call.write to pass to the client streaming messages. This works fine in plain TypeScript using the below but it doesn't work from within a nest.js gRPC controller. I am also using Envoy which works fine with the unary calls on nest.js as well as with the simple server.

function doOnAdd(call) {
  setInterval(() => {
    const myTodo = JSON.stringify({
      id: 'b779cb10-72c8-416f-9399-273eab8e3421',
      title: 'Fix the server streaming',
      completed: false,
    });
    console.log('Sending streaming data', myTodo);
    call.write({message: myTodo});
  }, 5000);

  call.on('end', () => {
    console.log('end');
  });

  setTimeout(() => {
    call.end();
  }, 30000);
}

But this nest.js code does not work (unary gRPC calls work fine in the same controller).

@GrpcMethod('TodoService', 'OnAdded')
  async onAdded(
    request: todo.OnAddedTodoRequest,
    metadata: Metadata,
    call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
  ) {
    setInterval(() => {
      const myTodo = JSON.stringify({
        id: 'b779cb10-72c8-416f-9399-273eab8e3421',
        title: 'Fix the server streaming',
        completed: false,
      });
      console.log('Sending streaming data', myTodo);
      const message = new todo.ServerMessage({ message: myTodo });
      call.write(message);
    }, 5000);

    call.on('end', () => {
      console.log('end');
    });

    setTimeout(() => {
      call.end();
    }, 30000);
  }

Finally here is the simplified protobuf:

syntax = "proto3";

package todo;

service TodoService {
  rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
}

message OnAddedTodoRequest {}

message ServerMessage {
  string message = 1;
}

Solution

  • I finally realised that nest.js doesn't wait for the interval to complete and returns immediately. My solution was to wrap it with a promise like this:

    @GrpcMethod('TodoService', 'OnAdded')
      async onAdded(
        request: todo.OnAddedTodoRequest,
        metadata: Metadata,
        call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
      ) {
      await new Promise((resolve) => {
        setInterval(() => {
          const myTodo = JSON.stringify({
            id: 'b779cb10-72c8-416f-9399-273eab8e3421',
            title: 'Fix the server streaming',
            completed: false,
          });
          console.log('Sending streaming data', myTodo);
          const message = new todo.ServerMessage({ message: myTodo });
          call.write(message);
        }, 5000);
    
        call.on('end', () => {
          console.log('end');
          resolve();
        });
    
        setTimeout(() => {
          call.end();
        }, 30000);
      });
    }