I have a node.js application that uses kue jobs to go over large collections using mongoose cursors.
kue job configuration:
function childJob(cid, name, dataset, type, link, line, market, done) {
var deferred = Q.defer();
mongoose.model(dataset).count({processed:{$ne:1}}, function(err, total){
console.log('total', total);
var job = jobs.create('child', {
type: 'CHILD',
cid: cid,
title: name,
dataset: dataset,
kind: type,
link: link,
line: line,
market: market,
current: 0,
total: total
});
job
.on('complete', function() {
deferred.resolve({
done: true,
job: job.data,
success: true
});
})
.on('failed', function() {
deferred.resolve({
done: true,
job: job.data,
success: false
});
})
job.save();
return deferred.promise;
});
}
Whenever a child job is started:
jobs.process('child', 10, function(job, done) {
var count = 0;
mongoose.model(job.data.dataset).find({processed:{$ne:1}}).lean().batchSize(1000).cursor().eachAsync(function(record){
functions.workRecord(record, job.data.link, job.data.line, job.data.market, myStats[job.data.line+job.data.market], function(stats){
count++;
job.data.current = count;
job.update();
job.progress(count, job.data.total, record);
if( count == job.data.total ){
myStats[job.data.line + job.data.market] = stats;
done();
}
});
});
})
Over time the number of records that cursor().eachAsync()
decreases dramatically. Going from processing 100 a second to about 1 or 2 records across several seconds and then a complete stop.
Is this a configuration issue? How can I set the query to have a consistent flow of records in the cursor over time?
EDIT1: eventually I get the following error:
(node:61193) UnhandledPromiseRejectionWarning: MongoError: Cursor not found, cursor id: 90600322391 at Function.MongoError.create (/home/sigma/SigmaCWCDataAnalysis/node_modules/mongoose/node_modules/mongodb-core/lib/error.js:31:11)
Looks like what was happenning is that the cursor was timing out.
Solution was setting the find() query to {timeout:false} like the following:
mongoose.model(job.data.dataset).find({},{timeout: false}).lean().cursor()