Search code examples
node.jspostgresqlpg-promisenode-streams

pg-promise: running a dependent query for each row in a query stream runs out of memory


In my application I am needing to run a dependent update for every row in a query that returns ~60k rows. The result set is too large to fit in memory so the natural solution would be to stream the results and run the dependent query for each in turn.

No matter what I try, my solution is running out of memory, even though I expected the streaming would allow me to do keep the memory usage low.

After much reading and re-reading SO, the pg-promise Wiki's various pages, and playing around with different ways to implement this I have come to the following (simplification of my code):

try {
    const startTime = new Date();
    await db.tx("test-tx", async tx => {
        const qs = new QueryStream(`SELECT s.a AS i FROM GENERATE_SERIES(1, 100000) AS s(a)`);
        const result = await tx.stream(qs, stream => {
            return pgp.spex.stream.read(
                stream,
                async (i, row) => {
                    // console.log(`handling ${i}: ${JSON.stringify(data)}`);
                    await innerQuery(tx, row.i, startTime);
                },
                { readChunks: true }
            )
            .then(r => console.log("read done", r));
        });
        console.log("stream done", result);
    });
    console.log(`transaction done: ${memUsage()}MB, ${duration(startTime)} seconds`);
} catch (error) {
    console.error(error);
} finally {
    db.client.$pool.end();
}

async function innerQuery(tx, count, startTime) {
    if (count % 10000 === 0) {
        console.log(`row ${count}: ${memUsage()}MB, ${duration(startTime)} seconds`);
    }
    await tx.one("SELECT 1");
    if (count % 10000 === 0) {
        console.log(`inner query ${count} done`);
    }
}

function duration(startTime) {
    return Math.round((new Date() - startTime) / 1000);
}

function memUsage() {
    return Math.round(process.memoryUsage().heapUsed / 1024 / 1024);
}

This runs the queries as expected but the memory usage just keeps going up:

row 10000: 66MB, 1 seconds
row 20000: 124MB, 2 seconds
row 30000: 181MB, 3 seconds
row 40000: 241MB, 4 seconds
row 50000: 298MB, 5 seconds
row 60000: 355MB, 6 seconds
row 70000: 415MB, 7 seconds
row 80000: 474MB, 8 seconds
row 90000: 532MB, 9 seconds
row 100000: 593MB, 10 seconds
read done { calls: 100000, reads: 100000, length: 100000, duration: 10054 }
stream done { processed: 100000, duration: 10054 }
inner query 10000 done
inner query 20000 done
inner query 30000 done
inner query 40000 done
inner query 50000 done
inner query 60000 done
inner query 70000 done
inner query 80000 done
inner query 90000 done
inner query 100000 done
transaction done: 641MB, 42 seconds

Now, something is up here: notice that the tx.stream call returns before all the inner queries resolve, printing out to the console. This explains the memory problem, all of those closures and promises (100k of them) are somehow waiting in memory for the stream to be done so they can themselves resolve and be GCed.

Another data point: if I change from db.tx to db.task at the top level, only one or two of the inner queries run before the connection is closed and further queries result in an error (Querying against a released or lost connection.).

I have also tried using tx.batch and using readChunks: false for the stream.read call, but that just halts after a single batch and locks up.

So what am I doing wrong? How can I get the inner queries to resolve as soon as they are done so the memory can progressively be reclaimed by the GC?


Solution

  • From what I can gather, there isn't an obvious way for the query stream to be slowed down to wait for some of the dependent queries to be done. New inner queries are being created as fast as results are streamed and thus the memory consumption goes through the roof.

    I found a solution to this that doesn't use a QueryStream. This uses a Server Side Cursor and means all the queries run in series. Haven't explored trying to run chunks of these in parallel to increase throughput but it definitely solved the memory issue.

    const startTime = new Date();
    await db.tx("test-tx", async tx => {
        await tx.none(`
            DECLARE test_cursor CURSOR FOR
            SELECT s.a AS i FROM GENERATE_SERIES(1, 100000) AS s(a)`);
        let row;
        while ((row = await tx.oneOrNone("FETCH NEXT FROM test_cursor"))) {
            await innerQuery(tx, row.i, startTime);
        }
        await tx.none("CLOSE test_cursor");
        console.log("outer query done");
    });
    console.log(`transaction done: ${memUsage()}MB, ${duration(startTime)} seconds`);
    

    This outputs something like

    row 10000: 9MB, 5 seconds
    inner query 10000 done
    row 20000: 8MB, 9 seconds
    inner query 20000 done
    row 30000: 10MB, 14 seconds
    inner query 30000 done
    row 40000: 9MB, 19 seconds
    inner query 40000 done
    row 50000: 11MB, 23 seconds
    inner query 50000 done
    row 60000: 10MB, 28 seconds
    inner query 60000 done
    row 70000: 8MB, 33 seconds
    inner query 70000 done
    row 80000: 11MB, 38 seconds
    inner query 80000 done
    row 90000: 9MB, 43 seconds
    inner query 90000 done
    row 100000: 12MB, 48 seconds
    inner query 100000 done
    outer query done
    transaction done: 12MB, 48 seconds