Search code examples
javascripthttpasynchronousfetch-apihttp-streaming

Use Fetch Streams API to consume chunked data asynchronously without using recursion


I'm using the JavaScript fetch streams API to consume chunked JSON asynchronously like in this answer.

My application may be receiving up to 25 small JSON objects per second (one for each frame in a video) over the span of an hour.

When the incoming chunks are large (1000+ JSON objects per chunk), my code functions well - fast, minimal memory use - it can easily receive 1,000,000 JSON objects reliably.

When the incoming chunks are smaller (5 JSON objects per chunk), my code functions poorly - slow, lots of memory consumption. The browser dies at about 50,000 JSON objects.

After doing a lot of debugging in the Developer tools, it appears the problem lies in the recursive nature of the code.

I tried to remove the recursion, but it seems required because the API is reliant on my code returning a promise to chain?!

How do I remove this recursion, or should I use something other than fetch?


Code with recursion (works)

String.prototype.replaceAll = function(search, replacement) {
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

results = []

fetch('http://localhost:9999/').then(response => {
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";

    reader.read().then(function processText({ done, value }) {
        if (done) {
          console.log("Stream done.");
          return;
        }

        try {
            decoded = td.decode(value);
            buffer += decoded;
            if (decoded.length != 65536){
                toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                result = JSON.parse(toParse);
                results.push(...result);
                console.log("Received " + results.length.toString() + " objects")
                buffer = "";
            }
        }
        catch(e){
            // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
            //console.log("EXCEPTION:"+e);
        }

        return reader.read().then(processText);
    })
});

Code without recursion (doesn't work)

String.prototype.replaceAll = function(search, replacement) {
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

results = []
finished = false

fetch('http://localhost:9999/').then(response => {
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";
    lastResultSize = -1

    while (!finished)
        if (lastResultSize < results.length)
        {
            lastResultSize = results.length;
            reader.read().then(function processText({ done, value }) {

                if (done) {
                  console.log("Stream done.");
                  finished = true;
                  return;
                }
                else
                    try {
                        decoded = td.decode(value);
                        //console.log("Received chunk " + decoded.length.toString() + " in length");
                        buffer += decoded;
                        if (decoded.length != 65536){
                            toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                            result = JSON.parse(toParse);
                            results.push(...result);
                            console.log("Received " + results.length.toString() + " objects")
                            buffer = "";
                            //console.log("Parsed chunk " + toParse.length.toString() + " in length");
                        }
                    }
                    catch(e) {
                        // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
                        //console.log("EXCEPTION:"+e);
                    }
            })
        }
});

For completeness, here is the python code I'm using on the test server. Note the line containing sleep which changes chunking behavior:

import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep


class TestServer(BaseHTTPRequestHandler):

    def do_GET(self):
        args = urllib.parse.parse_qs(self.path[2:])
        args = {i:args[i][0] for i in args}
        response = ''

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Transfer-Encoding', 'chunked')
        self.end_headers()

        for i in range (1000000):
            self.wfile.write(bytes(f'{{"x":{i}, "text":"fred!"}}\n','utf-8'))
            sleep(0.001)  # Comment this out for bigger chunks sent to the client!

def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
    httpd = HTTPServer((server_address, server_port), TestServer)
    print(f'Serving on http://{httpd.server_name}:{httpd.server_port} ...')
    httpd.serve_forever()


if __name__ == '__main__':
    main()

Solution

  • The part you're missing is that the function passed to .then() is always called asynchronously, i.e. with an empty stack. So there is no actual recursion here. This is also why your 'without recursion' version doesn't work.

    The simple solution to this is to use async functions and the await statement. If you call read() like this:

    const {value, done} = await reader.read();
    

    ...then you can call it in a loop and it will work how you would expect.

    I don't know specifically where your memory leak is, but your use of global variables looks like a problem. I recommend you always put 'use strict'; at the top of your code so the compiler will catch these problems for you. Then use let or const whenever you declare a variable.

    I recommend you use TextDecoderStream to avoid problems when a character is split between multiple chunks. You will also have issues when a JSON object is split between multiple chunks.

    See Append child writable stream demo for how to do this safely (but note that you need TextDecoderStream where that demo has "TextDecoder").

    Note also the use of a WritableStream in that demo. Firefox doesn't support it yet AFAIK, but WritableStream provides much easier syntax to consume chunks without having to explicitly loop or recurse. You can find the web streams polyfill here.