Search code examples
node.jsstreamingserver-sent-events

How to consume Server Side Events (SSE) in Node for CustomGPT


I'm creating a Node back-end that interacts with a third-party service CustomGPT that consumes a streaming response to forward that to a client. This approach keeps the credentials hidden from the clients. The CustomGPT API docs for the streaming parameter mention:

Whether to stream the response or not, if stream the responses will be sent as data-only server-sent events as they become available, with the stream terminated by a status: "finish" message.

The following example is given for Node:

import sdk from '@api/customgpt';

sdk.auth('sdk-token-value');
sdk.postApiV1ProjectsProjectidConversationsSessionidMessages({
  response_source: 'own_content',
  prompt: "Tell me how to consume your SSE's in Node"
}, {
  stream: 'true',
  lang: 'en',
  projectId: '1234',
  sessionId: '1'
})
  .then(({ data }) => console.log(data))
  .catch(err => console.error(err));

There is no part on how to handle the streaming response, other than a console.log. When using this example code the response takes a long time and then seems to be delivered in one go, instead of chunks of a stream. The format is that of SSE as described in the MDN Web docs.

What I've tried

Consuming Server Sent Events seems to be primarily done using the Eventsource, like in this example (dev.to):

const evtSource = new EventSource("http://localhost:4000/event-source");
evtSource.onmessage = (event) => {
  if (event.data) {
    setData(JSON.parse(event.data));
  }
};

This however is a front-end implementation. It also requests a GET endpoint instead of a POST endpoint and does not allow for authentication.

I've tried parsing the data received from the CustomGPT endpoint myself by exploding on double newline \n\n characters and using res.write(JSON.stringify(chunk)). This works in sending the data to the clients, but instead of a stream, all the messages/events are sent in one go, defeating the whole streaming effect.

Question How do I handle the streaming SSE's from CustomGPT in my Node backend so I can forward that to the receiving client, thereby shielding the credentials and sensitive settings from the clients?


Solution

  • After a few days of experimenting I've found the solution. Posting it here for posterity and hopefully to save someone time in the future.

    The main key is receiving chunks of Server-Sent-Events (SSE's) from the third-party service and forwarding that to the client. This way the credentials are hidden from the client and we can do some extra logging/processing.

    I've switched the SDK for the standard fetch api to make things simpler.

    conversationStream: async (res: Response, prompt: string, sessionId: string) => {
      // Set headers for the streaming response
      res.set({
        'Cache-Control': 'no-cache',
        'Content-Type': 'text/event-stream; charset=utf-8',
        'Connection': 'keep-alive',
      });
      res.flushHeaders();
    
      // Send a correctly formatted connection event
      res.write(`event: connected\ndata: {"sessionId": "${sessionId}"}\n\n`);
    
      const url = `https://www.exampe.com/${sessionId}/messages?stream=true&lang=nl`;
      const options = {
        method: 'POST',
        headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': 'Bearer xxxxxxxx',
        },
        body: JSON.stringify({
          prompt,
        })
      };
    
      fetch(url, options)
      .then(res => res.body?.pipeThrough(new TextDecoderStream()).getReader()) // <-- This is the key part: transforming the body into a stream
      .then(async (reader) => {
        while(reader) {
          const { done, value } = await reader.read();
          if (done) {
          break;
          }
          res.write(value);
        }
      })
      .catch(err => res.write(`event: error\ndata: {"message": "${err}"}\n\n`);
      .finally(() => {
        // Close the stream
        res.end();
      });
    }