Search code examples
node.jspromiseasync-awaitintercom

Async-Await & Bottleneck Rate Limiting using Promise.all


I'm using an API which has a rate limit of 500 requests / min. Therefore I decided to use bottleneck. But I need to execute array of async functions which generates a Promise to make that API call. I'm not sure I'm on the right way. Because API responses me with "Exceeded rate limit of 83 in 10_seconds" where I just only send 70 requests in 10 seconds.

Here is how I call the main function:

const result = await Helper.updateUsers(request.query.where);
..
..

Here is the helper.js

const Boom = require("boom");
const mongoose = require("mongoose");
const Bottleneck = require("bottleneck");

const Intercom = require("intercom-client");

const config = require("../../config/config");

const client = new Intercom.Client({
  token: config.intercom.access_token
});

const User = mongoose.model("User");
const Shop = mongoose.model("Shop");

// create a rate limiter that allows up to 70 API calls per 10 seconds,
// with max concurrency of 70
const limiter = new Bottleneck({
  maxConcurrent: 70,
  minTime: 10000
});

// Helpers

// This function prepares a valid Intercom User Object.
// user -> User Object
// returns <Promise>
const prepareAndUpdateUser = async user => {
  try {
    let userData = {
      email: user.email,
      user_id: user._id,
      companies: []
    };
    Shop.find({ _id: { $in: user.account.shops } })
      .exec((err, shops) => {
        if (err) console.log("INTERCOM UPDATE USER", err);
        shops.forEach(shop => {
          let shopData = {
            company_id: shop._id,
            name: shop.name[shop.defaultLanguage.code]
          };
          userData.companies.push(shopData);
        });
        // Update Intercom Promise
        return client.users.create(userData);
      });
  } catch (e) {
    return Boom.boomify(err);
  }
};

module.exports.updateUsers = async query => {
  try {
    const users = await User.find(query)
      .populate("account")
      .limit(700);
    if (users && users.length > 0) {
      limiter.schedule(() => {
        const allTasks = users.map(
          async user => await prepareAndUpdateUser(user)
        );
        return Promise.all(allTasks);
      });
      return users.length;
    } else {
      return 0;
    }
  } catch (err) {
    return Boom.boomify(err);
  }
};

Am I using Bottleneck & Async-Await correct?


Solution

  • The first thing to point out is your use of callbacks in an async method instead of awaiting a promise. You should use the promise returning version of Shops.find() and await the results.

    async function prepareAndUpdateUser(user) {
        try {
            const shops = await Shop.find({ _id: { $in: user.account.shops } }).exec();
            return client.users.create({
                email: user.email,
                user_id: user._id,
                companies: shops.map(shop => {
                    return {
                        company_id: shop._id,
                        name: shop.name[shop.defaultLanguage.code]
                    };
                })
            });
        } catch (e) {
            return Boom.boomify(err);
        }
    }
    

    In your updateUsers method you're using the rate limiter backwards. You want to map the users into the rate limiter so that it can control when prepareAndUpdateUser is called, currently you'll be requesting everything in parallel. You also want to wait for promise returned by the rate limiter to resolve. Essentially you'll want to move limiter.scehdule(...) into user.map(...).

    async function updateUsers(query) {
        try {
            const users = await User.find(query)
                .populate("account")
                .limit(700);
            if (users && users.length > 0) {
                // Schedule an update for each user
                const allTasks = users.map(user => {
                    // Schedule returns a promise that resolves when the operation is complete
                    return limiter.schedule(() => {
                        // This method is called when the scheduler is ready for it
                        return prepareAndUpdateUser(user)
                    })
                });
                // Wait for all the scheduled tasks to complete
                await Promise.all(allTasks);
                return users.length;
            } else {
                return 0;
            }
        } catch (err) {
            return Boom.boomify(err);
        }
    }