I have a postgresql table fulled with thousands of time-series data each day. I have an app allowing user to retrieve those data. Queries can take up from 200ms to 30 sec depending on the time range, so these must be cancelable to avoid useless load on production.
As there is billions of data, the use of stream to retrieve them is unavoidable.
So I managed to get a working endpoint with data stream like it figures in pg-promise docs, and make it cancelable by closing the cursor within pg-query-stream
.
Here is a sample of what is done within this endpoint (dataStream() is called after having building the query) :
const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const db = pgp({
host: "1.2.3.4",
port: 5432,
database: "db",
user: "user",
password: "password",
max: 2,
});
// query is an SQL string
dataStream(query, req, res, next) {
const qs = new QueryStream(query);
// "close" event is triggered on client request cancelation
req.on("close", () => {
qs.destroy();
});
return db.stream(qs, s => {
s.pipe(JSONStream.stringify()).pipe(res);
s.on("error", error => handleError(error));
})
.catch(error => handleError(error, query));
}
It works great for a few calls but at that at some point (performing 8 to 10 calls quickly to check cancelability), the app crash with this stack :
\node_modules\pg-promise\node_modules\pg\lib\client.js:346
if (self.activeQuery.name) {
^
TypeError: Cannot read property 'name' of null
at Connection.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\client.js:346:26)
at Connection.emit (events.js:311:20)
at Socket.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\connection.js:120:12)
at Socket.emit (events.js:311:20)
at addChunk (_stream_readable.js:294:12)
at readableAddChunk (_stream_readable.js:275:11)
at Socket.Readable.push (_stream_readable.js:209:10)
at TCP.onStreamRead (internal/stream_base_commons.js:186:23)
So I am suspecting that calling qs.destroy() to close the stream is not the right way to do this, even though the cursor is well destroyed on the server side.
Thanks to node-postgres and pg-promise developers for your work.
For those interested, I found one working solution after many attempts. It also solve one other problem I had : By spamming the request to check theirs cancelability, I noticed that some clients in the pool was hanging forever and never return, leading to making the pool full and new requests pending forever.
I think this can be explained by the fact that res
got piped in the stream and as the request has been canceled, the readable stream never get consumed and hang.
One other problem in my code was the req.on("close",
not always getting triggered.
To solve this I found a module called on-finished
acting juste like wanted.
Also, calling qs.destroy()
was not the right way to do it. After long hours of debugging, the most consistent way to do it without unhandled errors was to get the Connection object from pgp's Database.connect() and end the query by calling connection.done().
So here is my solution:
const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const JSONStream = require("JSONStream");
const onFinished = require("on-finished");
const db = pgp({
host: "1.2.3.4",
port: 5432,
database: "db",
user: "user",
password: "password",
max: 2,
});
// query is an SQL string
async function dataStream(query, req, res, next) {
try {
if (query instanceof Object) {
query = query.toString();
}
const connection = await db.connect();
const qs = new QueryStream(query, [], {highWaterMark: 4000});
const streamData = connection.client.query(qs);
onFinished(res, () => {
// Calling .done() to end the connection on request close.
// Weirdly I sometimes get an error if I do not provide a callback.
connection.done(error => {
log.error(error);
});
});
streamData.pipe(JSONStream.stringify()).pipe(res);
streamData.on("error", error => {
next(error);
});
} catch (error) {
next(error);
}
}