Search code examples
node.jsfetchpuppeteer

How to get streamed data from fetch() inside of page.evaluate() in puppeteer? (node.js)


Here is a simplified version of my code:

var page;
var launched = false;

app.post("/test", async(req, res) => {

    if ( launched == false ) {
        const browser = await puppeteer.launch({
            headless: true, /* I've tried with "new" and false too */
        });

        page = await browser.newPage();

        var desiredUrl = "url here";
        await page.goto(desiredUrl);

        /* Stream data from the page */
        await page.exposeFunction('writeData', (data) => {
            console.log("Writing data");
            res.write(data);
        });

        /* End stream */
        await page.exposeFunction('endStream', () => {
                console.log("End stream");
                res.end();
        });

        launched = true;
    }

    await page.evaluate(async ()=>{
        var output = await fetch("/endpoint_here", {
    "headers": {
            /* headers here */
               },
        });

        var reader = output.body.getReader();

        while (true) {
            var { done, value } = await reader.read();
            if (done) {
                window.endStream();
                return;
            }
            
            var decoder = new TextDecoder();
            var decodedData = decoder.decode(value);
            window.writeData(decodedData);
        }
    });

})

However, this doesn't work. What I've tried is listed below:

res doesn't work inside of page.evaluate(). I've tried sending res into the page.evaluate(), but it breaks the code.

I've tried using page.exposeFunction() and doing the res.write (and res.end() ) there, and it worked but only for the first time. The second time (and every time after that) where I sent the post request, the code ran properly (it did the console.logs in those functions) except it didn't do the res.write() and res.end() at all.

I've even tried making it update a global variable inside the page.evaluate() using an exposed function, detecting the changes in that variable using a proxy and doing res.write() to write the data, but that also broke after the first post request.

The only fix to this strange problem of it only working the first time is restarting the program, which obviously isn't a solution.

I've also tried logging the stream data to the console in the page and used page.on('console') to res.write() the data back to the client. This worked perfectly with one request at a time. However, when there were multiple simultaneous requests to the endpoint "/test", it would write the response to both clients instead of just the one that initiated the request.

The only thing that DID work was just returning the response from the fetch after it ended without streaming it. However, I want it to be streamed.

I'm stuck and have no idea what to do, so any help would be greatly appreciated.


Solution

  • I'm unable to reproduce the problem. The issue seems to be something to do with the endpoint you're hitting, and/or your server configuration. I suggest sharing that information or trying to build a repro of your own.

    Here's my replication attempt in case it helps you. You can see the code works if you run

    $ node -v
    v20.11.1
    $ npm i
    $ node sse-endpoint &
    $ node server &
    $ curl localhost:3001/stream
    data: {"chunk":0}
    
    data: {"chunk":1}
    
    data: {"chunk":2}
    
    data: {"chunk":3}
    
    # ... and so on, streamed every second ...
    

    package.json:

    {
      "dependencies": {
        "express": "^4.19.2",
        "puppeteer": "^22.6.0"
      }
    }
    

    sse-endpoint.js (this is a mock of the remote API you're intercepting):

    const express = require("express");
    const app = express();
    
    app.use((req, res, next) => {
      res.setHeader("Access-Control-Allow-Origin", "*");
      next();
    });
    
    app.get("/stream", (req, res) => {
      res.writeHead(200, {
        "Connection": "keep-alive",
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
      });
    
      let counter = 0;
      const interval = setInterval(() => {
        const chunk = JSON.stringify({chunk: counter++});
        res.write(`data: ${chunk}\n\n`);
      }, 1000);
    
      res.on("close", () => {
        clearInterval(interval);
        res.end();
      });
    });
    
    const listener = app.listen(process.env.PORT || 3000, () =>
      console.log(`SSE endpoint is listening on port ${listener.address().port}`)
    );
    

    server.js (this is your API):

    const express = require("express");
    const puppeteer = require("puppeteer");
    const app = express();
    
    app.use(express.static("public"));
    const browserReady = puppeteer.launch();
    
    app.get("/stream", async (req, res) => {
      res.writeHead(200, {
        "Connection": "keep-alive",
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
      });
      let page;
    
      try {
        page = await (await browserReady).newPage();
        await page.goto("about:blank");
        await page.exposeFunction("writeData", data => {
          res.write(data);
        });
        await page.exposeFunction("endStream", () => {
          res.end();
        });
        await page.evaluate(async () => {
          const output = await fetch(
            "http://localhost:3000/stream"
          );
          const reader = output.body.getReader();
    
          while (!window._aborted) {
            const {done, value} = await reader.read();
    
            if (done) {
              return window.endStream();
            }
    
            const decoder = new TextDecoder();
            const decodedData = decoder.decode(value);
            window.writeData(decodedData);
          }
        });
        res.on("close", async () => {
          await page.evaluate("window._aborted = true");
        });
      } catch (err) {
        console.error(err);
        res.end();
      } finally {
        await page?.close();
      }
    });
    
    const listener = app.listen(process.env.PORT || 3001, () =>
      console.log(
        `Proxy server is listening on port ${listener.address().port}`
      )
    );
    

    Note: this code is for demonstration as a POC and doesn't necessarily demonstrate best practices.

    If you want to reuse the page, the following should get you started, although you'll want to be prepared to add some page restart logic in case it crashes, which can happen pretty easily.

    I generally recommend avoiding premature optimization and creating a new page per request if possible--they're pretty lightweight. Also, you will likely run into trouble with multiple clients manipulating the same page simultaneously. Each client will need their own request, result and fetch handling context and there are a lot of edge cases to handle.

    const express = require("express");
    const puppeteer = require("puppeteer");
    const app = express();
    
    app.use(express.static("public"));
    const browserReady = puppeteer.launch();
    const pageReady = (async () => {
      const browser = await browserReady;
      const [page] = await browser.pages();
      await page.exposeFunction("writeData", data => {
        writeData(data);
      });
      await page.exposeFunction("endStream", () => {
        endStream();
      });
      return page;
    })();
    let writeData = () => {};
    let endStream = () => {};
    
    app.get("/stream", async (req, res) => {
      res.writeHead(200, {
        "Connection": "keep-alive",
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
      });
    
      try {
        const page = await pageReady;
        await page.goto("about:blank");
        writeData = data => res.write(data);
        endStream = () => res.end();
        res.on("close", async () => {
          await page.evaluate("window._aborted = true");
        });
        await page.evaluate(async () => {
          try {
            const output = await fetch(
              "http://localhost:3000/stream"
            );
            const reader = output.body.getReader();
    
            while (!window._aborted) {
              const {done, value} = await reader.read();
    
              if (done) {
                return window.endStream();
              }
    
              const decoder = new TextDecoder();
              const decodedData = decoder.decode(value);
              window.writeData(decodedData);
            }
          } catch (err) {
            window.writeData(decodedData);
          }
        });
      } catch (err) {
        console.error(err);
        res.end();
      }
    });
    
    const listener = app.listen(process.env.PORT || 3001, () =>
      console.log(
        `Proxy server is listening on port ${listener.address().port}`
      )
    );
    

    If the SSE endpoint is streamed by another page, that shouldn't impact this repro. You can have sse-endpoint serve a HTML file and run page.goto("localhost:3000") before running the evaluate()/fetch(); it shouldn't make a difference. Make sure you're intercepting or consuming your actual endpoint correctly. Likely, the problem lies there and details matter at that stage.

    Depending on what you're trying to achieve (I'm guessing something like proxying a GPT chat feed?), there may be a much simpler way to achieve whatever the fundamental goal is--another reason why full context is important.