Search code examples
mysqlnode.jskubernetesknex.js

Knex js stream large data


I have a MySQL table with millions of data. For each row I have to apply a custom logic and update the modified data on another table.

Using knex.js I run the query to read the data using the stream() function

Once I get the Stream object I apply my logic to the data event. Everything works correctly but at a certain point it stops without giving any errors.

I tried to pause the stream before each update operation in the new table and restart it after completing the update but the problem is not solved. Trying to put a limit on the query, for example to 1000 results, the system works fine.

Sample code:

const readableStream = knex.select('*')
  .from('big_table')
  .stream();

readableStream.on('data', async(data) => {
    readableStream.pause() // pause stream
    const toUpdate = applyLogic(data) // sync func
    const whereCond = getWhereCondition(data) // sync func
    try {
       await knex('to_update').where(whereCond).update(toUpdate)
       console.log('UPDATED')
       readableStream.resume() // resume stream
    } catch (e) {
       console.log('ERROR', e)
    }

    readableStream.resume() // resume stream
    
}).on('finish', () => {
   console.log('FINISH')
}).on('error', (err) => {
   console.log('ERROR', err)
})

Thanks!


Solution

  • I solved.

    The problem is not due to knex.js or the streams but to my development environment. I use k3d to simulate the production environment on the gcp. So to test my script locally I did a port-forward of the MySQL service.

    It is not clear to me why the system crashes but by creating a container with my script so that it connects to the MySQL service, the algorithm works as I expect.

    Thanks