I have a nodejs/express server and I'm trying to merge and sort sorted results from multiple mongodb collections in order to create a sorted CSV file. The way I achieve this requires that I keep the mongodb cursors alive (no timeout) until I read/exhaust all data, or until an error occurs, in which case I have to close them manually. It seems to work when there aren't many data points. However, when the mongo queries request data for one year for example, at some point after almost half an hour, I get the following mongo error: Cursor not found: cursor id: 59427962835
.
Promise
s are bluebird
promises. Written in Typescript.
import * as _ from 'lodash';
import * as moment from 'moment-timezone';
function findNative(db, collection, spec={}) {
const {query, fields, sort, limit, skip, hint, timeout=true} = spec;
// internal function that gets a connection from the connection pool
// returns promise with connection
return ensureConnection(db)
.then(connection => {
const cursor = connection.collection(collection).find(
query || {},
{fields, sort, limit, skip, hint, timeout});
// For sorted queries we have to limit batchSize
// see https://jira.mongodb.org/browse/SERVER-14228
if (connection.serverConfig.capabilities().maxWireVersion == 0 && sort && !limit) {
cursor.batchSize(0);
}
return cursor;
});
}
function getMongoStream(col, startdate, enddate) {
return findNative('testDb', col, {
query: { t: { $gte: startdate, $lte: enddate }},
sort: { t: 1 },
fields: { i: 0, _id: 0 },
timeout: false
});
}
async function fetchNextCursorData(cursor) {
const hasMore = await cursor.hasNext();
console.log(hasMore, cursor.cursorState.cursorId.toString());
return hasMore ? cursor.next() : Promise.resolve(null);
}
function findEarliestDate(buffer: any[]): [string, number[]] {
let earliestDateMS;
const indices = _(buffer)
.map(x => x && x.t.getTime())
.forEach(t => {
// make sure timestamp is defined
// buffer also contains null values
if(t && (!earliestDateMS || (earliestDateMS && t < earliestDateMS))) {
earliestDateMS = t;
}
})
.reduce((acc, t, i) => {
if(t === earliestDateMS) {
acc.push(i);
}
return acc;
}, []);
return [moment(earliestDateMS).utc().format('YYYY-MM-DD HH:mm:ss.SSS'), indices];
}
function closeAllCursors(cursors: any[]) {
const openCursors = cursors
.filter(c => !c.isClosed());
openCursors.forEach(c => c.close());
}
async function csvData(req, res) {
const collections: string[] = req.swagger.params.collections.value.split(',').sort(),
sources: string[] = req.swagger.params.sources.value.split(',').sort(),
startdate = new Date(Number(req.swagger.params.startdate.value)),
enddate = new Date(Number(req.swagger.params.enddate.value));
const filename = `${moment.utc().format('YYYY-MM-DD_HH:mm')}.csv`;
res.set({
'Content-Type': 'text/csv',
'Content-Disposition': `attachment; filename="${filename}"`
});
res.write('Date UTC,' + sources.join(',') + '\n');
const colPromises = collections.map(col => getMongoStream(col, startdate, enddate));
let cursorsMap: { [rec: string]: any; };
try {
let buffer = [], dateCSVBuffer: any[] = _.fill(Array(sources.length), '');
// fetch first doc from all cursors
const cursors = await Promise.all(colPromises);
cursorsMap = _.zipObject<any>(collections, cursors);
let docs = await Promise.all(cursors.map(fetchNextCursorData));
// initial request made for all collections
let requestedIdx = _.range(0, collections.length);
while(true) {
docs.forEach((doc, i) => {
buffer[requestedIdx[i]] = doc;
});
// null indicates that cursor won't return more data =>
// all cursors are exhausted
if(buffer.every(d => d === null)) {
break;
}
const [date, indices] = findEarliestDate(buffer);
requestedIdx = indices;
indices.forEach(idx => {
// update csv buffer
const {data} = buffer[idx];
Object.keys(data)
.forEach(ch => {
const sourceIndex = sources.indexOf(ch);
if(sourceIndex > -1) {
dateCSVBuffer[sourceIndex] = data[ch];
}
});
// remove doc from buffer
buffer[idx] = null;
});
// send csv string
dateCSVBuffer.unshift(date);
res.write(dateCSVBuffer.join(',') + '\n');
// empty buffer
dateCSVBuffer = dateCSVBuffer.map(() => '');
// request new entry from cursors
const nextDocPromises = indices
.map(idx => cursorsMap[collections[idx]])
.map(fetchNextCursorData);
docs = await Promise.all(nextDocPromises);
}
// end data stream
res.end();
} catch(err) {
// make sure to close all cursors
// will catch all nested promise errors
closeAllCursors(_.values(cursorsMap));
console.error(err);
res.status(500).json(err);
}
}
Mongodb connection created with following options:
{
auto_reconnect: true,
poolSize: 30,
connectTimeoutMS: 90000
}
Could the problem be that I keep the cursor references in the map and thus they are not updated? And when I do a cursor.hasNext()
cursor is already dead? I also tried checking whether cursor.isClosed()
but it always returns false
.
Mongodb driver is "mongodb": "2.2.15"
and the queries are tested against a v3.0 database.
EDIT: I did a small count test to see how many docs have been processed at the time when the program crashes. The 3 cursors (test case requested only data from 3 collections) have the following counts and ids:
3097531 '59427962835'
31190333 '53750510295'
32007475 '101213786015'
and the last document cursor with id '59427962835'
processed was number 4101. So not even close to finishing
Turns out that adding the timeout
to the find
query doesn't work. I had to use the noCursorTimeout
flag like so:
const cursor = connection.collection(collection)
.find(query || {}, {fields, sort, limit, skip, hint})
.addCursorFlag('noCursorTimeout', !timeout);