Search code examples
node.jsnode-oracledb

How to 'pipe' oracle-db data from 'on data' event


I've been using node-oracledb for a few months and I've managed to achieve what I have needed to so far.

I'm currently working on a search app that could potentially return about 2m rows of data from a single call. To ensure I don't get a disconnect from the browser and the server, I thought I would try queryStream so that there is a constant flow of data back to the client.

I implemented the queryStream example as-is, and this worked fine for a few hundred thousand rows. However, when the returned rows is greater than one million, Node runs out of memory. By logging and watching both client and server log events, I can see that client is way behind the server in terms of rows sent and received. So, it looks like Node is falling over because it's buffering so much data.

It's worth noting that at this point, my selectstream implementation is within a req/res function called via Express.

To return the data, I do something like....

stream.on('data', function (data) {

    rowcount++;

    let obj = new myObjectConstructor(data);
    res.write(JSON.stringify(obj.getJson());

});

I've been reading about how streams and pipe can help with flow, so what I'd like to be able to do is to be able to pipe the results from the query to a) help with flow and b) to be able to pipe the results to other functions before sending back to the client.

E.g.

function getData(req, res){

    var stream = myQueryStream(connection, query);

    stream
        .pipe(toSomeOtherFunction)
        .pipe(yetAnotherFunction)
        .pipe(res);

}

I'm spent a few hours trying to find a solution or example that allows me to pipe results, but I'm stuck and need some help.

Apologies if I'm missing something obvious, but I'm still getting to grips with Node and especially streams.

Thanks in advance.


Solution

  • There's a bit of an impedance mismatch here. The queryStream API emits rows of JavaScript objects, but what you want to stream to the client is a JSON array. You basically have to add an open bracket to the beginning, a comma after each row, and a close bracket to the end.

    I'll show you how to do this in a controller that uses the driver directly as you have done, instead of using separate database modules as I advocate in this series.

    const oracledb = require('oracledb');
    
    async function get(req, res, next) {
      try {
        const conn = await oracledb.getConnection();
    
        const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});
    
        res.writeHead(200, {'Content-Type': 'application/json'});
    
        res.write('[');
    
        stream.on('data', (row) => {
          res.write(JSON.stringify(row));
          res.write(',');
        });
    
        stream.on('end', () => {
          res.end(']');
        });
    
        stream.on('close', async () => {
          try {
            await conn.close();
          } catch (err) {
            console.log(err);
          }
        });
    
        stream.on('error', async (err) => {
          next(err);
    
          try {
            await conn.close();
          } catch (err) {
            console.log(err);
          }
        });
      } catch (err) {
        next(err);
      }
    }
    
    module.exports.get = get;
    

    Once you get the concepts, you can simplify things a bit with a reusable Transform class which allows you to use pipe in the controller logic:

    const oracledb = require('oracledb');
    const { Transform } = require('stream');
    
    class ToJSONArray extends Transform {
      constructor() {
        super({objectMode: true});
    
        this.push('[');
      }
    
      _transform (row, encoding, callback) {
        if (this._prevRow) {
          this.push(JSON.stringify(this._prevRow));
          this.push(',');
        }
    
        this._prevRow = row;
    
        callback(null);
      }
    
      _flush (done) {
        if (this._prevRow) {
          this.push(JSON.stringify(this._prevRow));
        }
    
        this.push(']');
    
        delete this._prevRow;
    
        done();
      }
    }
    
    async function get(req, res, next) {
      try {
        const toJSONArray = new ToJSONArray();
        const conn = await oracledb.getConnection();
    
        const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});
    
        res.writeHead(200, {'Content-Type': 'application/json'});
    
        stream.pipe(toJSONArray).pipe(res);
    
        stream.on('close', async () => {
          try {
            await conn.close();
          } catch (err) {
            console.log(err);
          }
        });
    
        stream.on('error', async (err) => {
          next(err);
    
          try {
            await conn.close();
          } catch (err) {
            console.log(err);
          }
        });
      } catch (err) {
        next(err);
      }
    }
    
    module.exports.get = get;