Search code examples
performanceagenda

Mongo db queue processing is delayed at times using agenda


We are using a mongodb-queue to do some processing and we are using the agenda scheduler to run the job every 3 mins to get a message from the queue and process the same. The issue that we are observing is that its not working consistently as expected, at times the message remains in the queue for sometime (not even acknowledged,means picked up) before it get processed once it starts processing the subsequent ones in the queue are getting processed faster again till that delay happens again. enter image description here

if you look at this deleted timestamp the last three transactions on top ran much later to the one before it whereas its supposed to process 3 to 4 mins later than the 4th record.

find below the code we use to fetch and process from queue

module.exports = function (agenda_processing) {

    var isStatic = false;
    agenda_processing.disableRemoteMethodByName('updateAttributes', isStatic);
    // agenda_processing = Object.assign(agenda_processing, httpManager);
    isStatic = true;
    agenda_processing.disableRemoteMethodByName('updateAll', isStatic);
    agenda_processing.disableRemoteMethodByName('deleteById', isStatic);
    agenda_processing.disableRemoteMethodByName('create', isStatic);
    agenda_processing.disableRemoteMethodByName('upsert', isStatic);
    agenda_processing.disableRemoteMethodByName('count', isStatic);
    agenda_processing.disableRemoteMethodByName('findOne', isStatic);
    agenda_processing.disableRemoteMethodByName('exists', isStatic);
    agenda_processing.disableRemoteMethodByName('find', isStatic);
    agenda_processing.disableRemoteMethodByName('findById', isStatic);
    
    var jobsManager = ''
    
    async function graceful() {
        if (jobsManager) {
          await  jobsManager.stop();
        }
        setTimeout(() => {
            process.exit(0);
        }, 1000)
    }
        process.on('uncaughtException', graceful);
        process.on("SIGTERM", graceful);
        process.on("SIGINT", graceful);
    
    //To deploy to dev commented out
    setTimeout(() => {

        setUpJobForProcessingQueue()
    }, 3000)

    function setUpJobForProcessingQueue () {
        const dbUrl = config.spkmsdb.url
        jobsManager = awbjobs.init(dbUrl, async () => {
            await defineAndStartJobs()
        })
        //console.log(jobsManager)
    };

    async function defineAndStartJobs () {
        let connector = agenda_processing.app.models.sites.getDataSource().connector.db
        queue = processingqueue.initQueue(connector)
        var jobNm = "processing-job"
        jobsManager.define(jobNm, async function (job,done) {   
            try {
                winstonLogger.info('Agenda: Entering the define callback')
                await getDataFromQueueAndProcess(queue)
                done()
                winstonLogger.info('Agenda: Called done')
            } catch(err) {
                done(err)
            }
        }.bind(jobNm))
        await jobsManager.every("180 seconds", jobNm) //3 minutes
        await jobsManager.start()
        winstonLogger.info('awb jobs have been set up')
    }
}

agenda is intailised like below:

const agenda = new Agenda({ db: { address: dbConStr ,options: { useUnifiedTopology: true, useNewUrlParser: true }}});
          agenda.on('ready', onReady)

Queue is internalized as

queuename =  'processing-queue'
const queue = mongoDbQueue(db, queuename, {maxRetries:1, visibility:3600});

Any help to resolve this consistency will be quite a help. thanks in advance.


Solution

  • The problem was with the agenda initializer the job never got created in the mongo db as the collection for the job was not paused while initializing the agenda which caused the scheduler to behave weirdly picking up from the queue unevenly especially while we had multiple instances of the application using its own scheduler jobs (collection name will be dynamic based on instance).

    const agenda = new Agenda({ db: { address: dbConStr, options: { useUnifiedTopology: true, useNewUrlParser: true }, collection: 'jobscollection' } });