Search code examples
javascriptnode.jspromisebluebird

Limit concurrency of pending promises


I'm looking for a promise function wrapper that can limit / throttle when a given promise is running so that only a set number of that promise is running at a given time.

In the case below delayPromise should never run concurrently, they should all run one at a time in a first-come-first-serve order.

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))

Any ideas on how to get a queue like this set up?

I have a "debounce" function from the wonderful Benjamin Gruenbaum. I need to modify this to throttle a promise based on it's own execution and not the delay.

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}

Solution

  • I don't think there are any libraries to do this, but it's actually quite simple to implement yourself:

    function sequential(fn) { // limitConcurrency(fn, 1)
        let q = Promise.resolve();
        return function(x) {
            const p = q.then(() => fn(x));
            q = p.reflect();
            return p;
        };
    }
    

    For multiple concurrent requests it gets a little trickier, but can be done as well.

    function limitConcurrency(fn, n) {
        if (n == 1) return sequential(fn); // optimisation
        let q = Promise.resolve();
        const active = new Set();
        const fst = t => t[0];
        const snd = t => t[1];
        return function(x) {
            function put() {
                const p = fn(x);
                const a = p.reflect().then(() => {
                    active.delete(a);
                });
                active.add(a);
                return [Promise.race(active), p];
            }
            if (active.size < n) {
                const r = put()
                q = fst(t);
                return snd(t);
            } else {
                const r = q.then(put);
                q = r.then(fst);
                return r.then(snd)
            }
        };
    }
    

    Btw, you might want to have a look at the actors model and CSP. They can simplify dealing with such things, there are a few JS libraries for them out there as well.

    Example

    import Promise from 'bluebird'
    
    function sequential(fn) {
      var q = Promise.resolve();
      return (...args) => {
        const p = q.then(() => fn(...args))
        q = p.reflect()
        return p
      }
    }
    
    async function _delayPromise (seconds, str) {
      console.log(`${str} started`)
      await Promise.delay(seconds)
      console.log(`${str} ended`)
      return str
    }
    
    let delayPromise = sequential(_delayPromise)
    
    async function a() {
      await delayPromise(100, "a:a")
      await delayPromise(200, "a:b")
      await delayPromise(300, "a:c")
    }
    
    async function b() {
      await delayPromise(400, "b:a")
      await delayPromise(500, "b:b")
      await delayPromise(600, "b:c")
    }
    
    a().then(() => console.log('done'))
    b().then(() => console.log('done'))
    
    // --> with sequential()
    
    // $ babel-node test/t.js
    // a:a started
    // a:a ended
    // b:a started
    // b:a ended
    // a:b started
    // a:b ended
    // b:b started
    // b:b ended
    // a:c started
    // a:c ended
    // b:c started
    // done
    // b:c ended
    // done
    
    // --> without calling sequential()
    
    // $ babel-node test/t.js
    // a:a started
    // b:a started
    // a:a ended
    // a:b started
    // a:b ended
    // a:c started
    // b:a ended
    // b:b started
    // a:c ended
    // done
    // b:b ended
    // b:c started
    // b:c ended
    // done