Search code examples
javascriptnode.jsasynchronouspromiseknex.js

Properly batch nested promises in Node


I'm running a knex seed in Node and need to batch an additional query to my database due to restrictions on my server. I'm starting to get the hang of promises and async/await, but I'm having trouble getting it to work at several levels deep (what's throwing me off in particular at this point is that it seems to interfere with the batching in a way that I can't quite make sense of). My seed file looks like this:

exports.seed = async function(knex) {
  const fs = require('fs');
  const _ = require('lodash');

  function get_event_id(location) {
    return knex('events')
      .where({location: location})
      .first()
      .then(result => { return result['id']; })
      .finally(() => { knex.destroy() })
  }

  function createImage(row, event_id) {
    return {
      name: row[4],
      event_id: event_id
    }
  };

  async function run_query(line) {
      let row = line.split(',');
      let event_id = await get_event_id(row[0]);
      return createImage(row, event_id);
  };

  async function run_batch(batch) {

      return Promise.all(batch.map(run_query));
  } 

  const file = fs.readFileSync('./data.csv');
  const lines = file.toString().replace(/[\r]/g, '').split('\n').slice(1,60); // skip csv header, then run first 59 lines

  const batches = _.chunk(lines, 30); // set batch size

  let images = await Promise.all(batches.map(run_batch));

  console.log(_.flatten(images).length);

};

My database can handle 30 queries at a time. Everything resolves properly if I run a single batch using .slice(1,30) on the line where lines is defined. But running with 60 as above gives me ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections.

The script completes if I change the content of run_batch to return batch.map(run_query), and it returns the correct number of entries (so it seems to be batching properly). But then the Promises are still pending. What am I missing, and is there a more elegant way to do this?


Solution

  • In this line:

    let images = await Promise.all(batches.map(run_batch));
    

    You are trying to run ALL the batches in parallel which is defeating your chunking entirely.

    You could use a regular for loop with await instead of the .map() so you runva batch, wait for it to finish, then run the next batch.

    let allResults = [];
    for (let batch of batches) {
         let images = await run_batch(batch);
         allResults.push(...images);
    }
    console.log(allResults);
    

    FYI, you might benefit from any number of functions people have written for processing a large array with no more than N requests in flight at the same time. These do not require you to manually break the data into batches. Instead, they monitor how many requests are in-flight at the same time and they start up your desired number of requests and as one finishes, they start another one, collecting the results for you.

    runN(fn, limit, cnt, options): Loop through an API on multiple requests

    pMap(array, fn, limit): Make several requests to an api that can only handle 20 at a time

    rateLimitMap(array, requestsPerSec, maxInFlight, fn): Proper async method for max requests per second

    mapConcurrent(array, maxConcurrent, fn): Promise.all() consumes all my ram

    There are also features to do this built into the Bluebird promise library and the Async-promises library.