Search code examples
node.jsrxjsmessage-queueproducer-consumer

Trigger the execution of a function if any condition is met


I'm writing an HTTP API with expressjs in Node.js and here is what I'm trying to achieve:

  • I have a regular task that I would like to run regularly, approx every minute. This task is implemented with an async function named task.
  • In reaction to a call in my API I would like to have that task called immediately as well
  • Two executions of the task function must not be concurrent. Each execution should run to completion before another execution is started.

The code looks like this:

// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

// call task regularly
setIntervalAsync(async () => {
  await task("ticker");
}, 5000) // normally 1min

// call task immediately
app.get("/task", async (req, res) => {
  await task("trigger");
  res.send("ok");
});

I've put a full working sample project at https://github.com/piec/question.js

If I were in go I would do it like this and it would be easy, but I don't know how to do that with Node.js.


Ideas I have considered or tried:

  • I could apparently put task in a critical section using a mutex from the async-mutex library. But I'm not too fond of adding mutexes in js code.
  • Many people seem to be using message queue libraries with worker processes (bee-queue, bullmq, ...) but this adds a dependency to an external service like redis usually. Also if I'm correct the code would be a bit more complex because I need a main entrypoint and an entrypoint for worker processes. Also you can't share objects with the workers as easily as in a "normal" single process situation.
  • I have tried RxJs subject in order to make a producer consumer channel. But I was not able to limit the execution of task to one at a time (task is async).

Thank you!


Solution

  • You can make your own serialized asynchronous queue and run the tasks through that.

    This queue uses a flag to keep track of whether it's in the middle of running an asynchronous operation already. If so, it just adds the task to the queue and will run it when the current operation is done. If not, it runs it now. Adding it to the queue returns a promise so the caller can know when the task finally got to run.

    If the tasks are asynchronous, they are required to return a promise that is linked to the asynchronous activity. You can mix in non-asynchronous tasks too and they will also be serialized.

    class SerializedAsyncQueue {
        constructor() {
            this.tasks = [];
            this.inProcess = false;
        }
        // adds a promise-returning function and its args to the queue
        // returns a promise that resolves when the function finally gets to run
        add(fn, ...args) {
            let d = new Deferred();
            this.tasks.push({ fn, args: ...args, deferred: d });
            this.check();
            return d.promise;
        }
        check() {
            if (!this.inProcess && this.tasks.length) {
                // run next task
                this.inProcess = true;
                const nextTask = this.tasks.shift();
                Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
                    this.inProcess = false;
                    nextTask.deferred.resolve(val);
                    this.check();
                }).catch(err => {
                    console.log(err);
                    this.inProcess = false;
                    nextTask.deferred.reject(err);
                    this.check();
                });
            }
        }
    }
    
    const Deferred = function() {
        if (!(this instanceof Deferred)) {
            return new Deferred();
        }
        const p = this.promise = new Promise((resolve, reject) => {
            this.resolve = resolve;
            this.reject = reject;
        });
        this.then = p.then.bind(p);
        this.catch = p.catch.bind(p);
        if (p.finally) {
            this.finally = p.finally.bind(p);
        }
    }
    
    
    let queue = new SerializedAsyncQueue();
    
    // utility function
    const sleep = function(t) {
        return new Promise(resolve => {
            setTimeout(resolve, t);
        });
    }
    
    // only a single execution of this function is allowed at a time
    // so it is run only via the queue that makes sure it is serialized
    async function task(reason: string) {
        function runIt() {
            console.log("do thing because %s...", reason);
            await sleep(1000);
            console.log("done");
        }
        return queue.add(runIt);
    }
    
    // call task regularly
    setIntervalAsync(async () => {
        await task("ticker");
    }, 5000) // normally 1min
    
    // call task immediately
    app.get("/task", async (req, res) => {
        await task("trigger");
        res.send("ok");
    });