Search code examples
javascriptnode.jsmongodbnode-mongodb-native

mongodb node cursor not found with timeout false


I have a nodejs/express server and I'm trying to merge and sort sorted results from multiple mongodb collections in order to create a sorted CSV file. The way I achieve this requires that I keep the mongodb cursors alive (no timeout) until I read/exhaust all data, or until an error occurs, in which case I have to close them manually. It seems to work when there aren't many data points. However, when the mongo queries request data for one year for example, at some point after almost half an hour, I get the following mongo error: Cursor not found: cursor id: 59427962835.

Promises are bluebird promises. Written in Typescript.

import * as _ from 'lodash';
import * as moment from 'moment-timezone';

function findNative(db, collection, spec={}) {
    const {query, fields, sort, limit, skip, hint, timeout=true} = spec;

    // internal function that gets a connection from the connection pool
    // returns promise with connection
    return ensureConnection(db)
        .then(connection => {
            const cursor = connection.collection(collection).find(
                query || {},
                {fields, sort, limit, skip, hint, timeout});
            // For sorted queries we have to limit batchSize
            // see https://jira.mongodb.org/browse/SERVER-14228
            if (connection.serverConfig.capabilities().maxWireVersion == 0 && sort && !limit) {
                cursor.batchSize(0);
            }

            return cursor;
        });
}

function getMongoStream(col, startdate, enddate) {
    return findNative('testDb', col, {
        query: { t: { $gte: startdate, $lte: enddate }},
        sort: { t: 1 },
        fields: { i: 0, _id: 0 },
        timeout: false
    });
}

async function fetchNextCursorData(cursor) {
    const hasMore = await cursor.hasNext();
    console.log(hasMore, cursor.cursorState.cursorId.toString());
    return hasMore ? cursor.next() : Promise.resolve(null);
}

function findEarliestDate(buffer: any[]): [string, number[]] {
    let earliestDateMS;
    const indices = _(buffer)
        .map(x => x && x.t.getTime())
        .forEach(t => {
            // make sure timestamp is defined
            // buffer also contains null values
            if(t && (!earliestDateMS || (earliestDateMS && t < earliestDateMS))) {
                earliestDateMS = t;
            }
        })
        .reduce((acc, t, i) => {
            if(t === earliestDateMS) {
                acc.push(i);
            }
            return acc;
        }, []);

    return [moment(earliestDateMS).utc().format('YYYY-MM-DD HH:mm:ss.SSS'), indices];
}

function closeAllCursors(cursors: any[]) {
    const openCursors = cursors
        .filter(c => !c.isClosed());
    openCursors.forEach(c => c.close());
}

async function csvData(req, res) {
    const collections: string[] = req.swagger.params.collections.value.split(',').sort(),
          sources: string[] = req.swagger.params.sources.value.split(',').sort(),
          startdate = new Date(Number(req.swagger.params.startdate.value)),
          enddate = new Date(Number(req.swagger.params.enddate.value));

    const filename = `${moment.utc().format('YYYY-MM-DD_HH:mm')}.csv`;

    res.set({
        'Content-Type': 'text/csv',
        'Content-Disposition': `attachment; filename="${filename}"`
    });
    res.write('Date UTC,' + sources.join(',') + '\n');

    const colPromises = collections.map(col => getMongoStream(col, startdate, enddate));
    let cursorsMap: { [rec: string]: any; };

    try {
        let buffer = [], dateCSVBuffer: any[] = _.fill(Array(sources.length), '');
        // fetch first doc from all cursors
        const cursors = await Promise.all(colPromises);
        cursorsMap = _.zipObject<any>(collections, cursors);
        let docs = await Promise.all(cursors.map(fetchNextCursorData));
        // initial request made for all collections
        let requestedIdx = _.range(0, collections.length);

        while(true) {
            docs.forEach((doc, i) => {
                buffer[requestedIdx[i]] = doc;
            });
            // null indicates that cursor won't return more data =>
            // all cursors are exhausted
            if(buffer.every(d => d === null)) {
                break;
            }

            const [date, indices] = findEarliestDate(buffer);
            requestedIdx = indices;

            indices.forEach(idx => {
                // update csv buffer
                const {data} = buffer[idx];
                Object.keys(data)
                    .forEach(ch => {
                        const sourceIndex = sources.indexOf(ch);
                        if(sourceIndex > -1) {
                            dateCSVBuffer[sourceIndex] = data[ch];
                        }
                    });
                // remove doc from buffer
                buffer[idx] = null;
            });
            // send csv string
            dateCSVBuffer.unshift(date);
            res.write(dateCSVBuffer.join(',') + '\n');
            // empty buffer
            dateCSVBuffer = dateCSVBuffer.map(() => '');

            // request new entry from cursors
            const nextDocPromises = indices
                .map(idx => cursorsMap[collections[idx]])
                .map(fetchNextCursorData);

            docs = await Promise.all(nextDocPromises);
        }
        // end data stream
        res.end();
    } catch(err) {
        // make sure to close all cursors
        // will catch all nested promise errors
        closeAllCursors(_.values(cursorsMap));
        console.error(err);
        res.status(500).json(err);
    }
}

Mongodb connection created with following options:

{
    auto_reconnect: true,
    poolSize: 30,
    connectTimeoutMS: 90000
}

Could the problem be that I keep the cursor references in the map and thus they are not updated? And when I do a cursor.hasNext() cursor is already dead? I also tried checking whether cursor.isClosed() but it always returns false.

Mongodb driver is "mongodb": "2.2.15" and the queries are tested against a v3.0 database.

EDIT: I did a small count test to see how many docs have been processed at the time when the program crashes. The 3 cursors (test case requested only data from 3 collections) have the following counts and ids:

3097531 '59427962835'
31190333 '53750510295'
32007475 '101213786015'

and the last document cursor with id '59427962835' processed was number 4101. So not even close to finishing


Solution

  • Turns out that adding the timeout to the find query doesn't work. I had to use the noCursorTimeout flag like so:

    const cursor = connection.collection(collection)
        .find(query || {}, {fields, sort, limit, skip, hint})
        .addCursorFlag('noCursorTimeout', !timeout);