Search code examples
node.jsmongodbstreampipelinestream-json

How to stream json data from one nodejs server to another and process it at the receiver at runtime?


what I'm basically trying to achieve is to get all items of a mongodb collection on a nodejs server, stream these items (in json format) over REST to another nodejs server and pipe the incoming readstream to stream-json to persist the parsed objects afterwards in another mongodb.

(I need to use streams because my items can be deeply nested objects which would consume a lot of memory. Additionally I'm unable to access the first mongodb from the second server directly due to a strict network segmentation.)

Well, the code I got so far is actually already working for smaller amounts of data, but one collection has about 1.2 GB. Therefore the processing at the receiving side continues to fail.

Here's the code of the sending server:

export const streamData = async (res: Response) => {
  try {
    res.type('json');
    const amountOfItems = await MyModel.count();
    if (JSON.stringify(amountOfItems) !== '0'){
      const cursor = MyModel.find().cursor();
      let first = true;
      cursor.on('error', (err) => {
        logger.error(err);
      });
      cursor.on('data', (doc) => {
        if (first) {
          // open json array
          res.write('[');
          first = false;
        } else {
          // add the delimiter before every object that isn't the first
          res.write(',');
        }
        // add json object
        res.write(`${JSON.stringify(doc)}`);
      });
      cursor.on('end', () => {
        // close json array
        res.write(']');
        res.end();
        logger.info('REST-API-Call to fetchAllItems: Streamed all items to the receiver.');
      });
    } else {
      res.write('[]');
      res.end();
      logger.info('REST-API-Call to fetchAllItems: Streamed an empty response to the receiver.');
    }
  } catch (err) {
    logger.error(err);
    return [];
  }
};

And that's the receiving side:

import { MyModel } from '../models/my-model';
import axios from 'axios';
import { logger } from '../services/logger';
import StreamArray from 'stream-json';
import { streamArray } from 'stream-json/streamers/StreamArray';
import { pipeline } from 'stream';

const persistItems = async (items:Item[], ip: string) => {
  try {
    await MyModel.bulkWrite(items.map(item => {
      return {
        updateOne: {
          filter: { 'itemId': item.itemId },
          update: item,
          upsert: true,
        },
      };
    }));
    logger.info(`${ip}: Successfully upserted items to mongoDB`);
  } catch (err) {
    logger.error(`${ip}: Upserting items to mongoDB failed due to the following error: ${err}`);
  }
};

const getAndPersistDataStream = async (ip: string) => {
  try {
    const axiosStream = await axios(`http://${ip}:${process.env.PORT}/api/items`, { responseType: 'stream' });
    const jsonStream = StreamArray.parser({ jsonStreaming : true });
    let items : Item[] = [];

    const stream = pipeline(axiosStream.data, jsonStream, streamArray(),
      (error) => {
        if ( error ){
          logger.error(`Error: ${error}`);
        } else {
          logger.info('Pipeline successful');
        }
      },
    );

    stream.on('data', (i: any) => {
      items.push(<Item> i.value);
      // wait until the array contains 500 objects, than bulkWrite them to database
      if (items.length === 500) {
        persistItems(items, ip);
        items = [];
      }
    });
    stream.on('end', () => {
      // bulkwrite the last items to the mongodb
      persistItems(items, ip);
    });
    stream.on('error', (err: any) => {
      logger.error(err);
    });
    await new Promise(fulfill => stream.on('finish', fulfill));
  } catch (err) {
    if (err) {
      logger.error(err);
    }
  }
}

As I said, the problem occurs only on a bigger collection holding about 1.2 Gb of data. The problem seems to occur a few seconds after the sending side server is closing the stream. This is the error message I get at the receiving server:

ERROR: Premature close
    err: {
      "type": "NodeError",
      "message": "Premature close",
      "stack":
          Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
              at IncomingMessage.onclose (internal/streams/end-of-stream.js:75:15)
              at IncomingMessage.emit (events.js:314:20)
              at Socket.socketCloseListener (_http_client.js:384:11)
              at Socket.emit (events.js:326:22)
              at TCP.<anonymous> (net.js:676:12)
      "code": "ERR_STREAM_PREMATURE_CLOSE"
    }

Can I somehow prevent the read stream from closing too early?

The only workaround I can imagine right now is to save the stream locally to a file first, then create a new readstream from that file, process/persist the data and delete the file afterwards, although I would prefer not to do that. Additionally I'm not quite sure if that's going to work out or if the closing read stream issue will remain if I try to save a large dataset to a file.

Edit: Well, as I guessed, this approach results in the same error.

Is there a better approach I'm not aware of?

Thanks in advance!


Solution

  • Found a solution using a combination of:

    Websockets with stream api and websocket-express to trigger the streaming over websockets via routes

    Backend

    app.ts

    import router from './router/router';
    import WebSocketExpress from 'websocket-express';
    
    const app = new WebSocketExpress();
    const port = `${process.env.APPLICATION_PORT}`;
    
    app.use(router);
    
    app.listen(port, () => {
      console.log(`App listening on port ${port}!`);
    });
    
    

    router.ts

    import { Router } from 'websocket-express';
    import streamData from './streamData';
    
    const router = new Router();
    
    router.ws('/my/api/path', streamData);
    
    export default router;
    
    

    streamData.ts (did some refactoring to the above version)

    import { MyModel } from '../models/my-model';
    import { createWebSocketStream } from 'ws';
    
    export const streamData = async (res: Response) => {
      const ws = await res.accept();
      try {
        const duplex = createWebSocketStream(ws, { encoding: 'utf8' });
        duplex.write('[');
        let prevDoc: any = null;
        // ignore _id since it's going to be upserted into another database
        const cursor = MyModel.find({}, { _id: 0 } ).cursor();
        cursor.on('data', (doc) => {
          if (prevDoc) {
            duplex.write(`${JSON.stringify(prevDoc)},`);
          }
          prevDoc = doc;
        });
        cursor.on('end', () => {
          if (prevDoc) {
            duplex.write(`${JSON.stringify(prevDoc)}`);
          }
          duplex.end(']');
        });
        cursor.on('error', (err) => {
          ws.close();
        });
        duplex.on('error', (err) => {
          ws.close();
          cursor.close();
        });
      } catch (err) {
        ws.close();
      }
    };
    

    Client (or the receiving server)

    import { MyModel } from '../models/my-model';
    import StreamArray from 'stream-json';
    import { streamArray } from 'stream-json/streamers/StreamArray';
    import { pipeline } from 'stream';
    import WebSocket, { createWebSocketStream } from 'ws';
    
    export const getAndPersistDataStream = async (ip: string) => {
      try {
        const ws = new WebSocket(`ws://${ip}:${process.env.PORT}/my/api/path`);
        try {
          const duplex = createWebSocketStream(ws, { encoding: 'utf8' });
          const jsonStream = StreamArray.parser({ jsonStreaming: true });
          let items: Items[] = [];
          const stream = pipeline(duplex, jsonStream, streamArray(), error => {
            if (error) {
              ws.close();
            }
          });
          stream.on('data', (i: any) => {
            items.push(<Items>i.value);
            if (items.length === 500) {
              persistItems(items, ip);
              items = [];
            }
          });
          stream.on('end', () => {
            persistItems(items, ip);
            ws.close();
          });
          stream.on('error', (err: any) => {
            ws.close();
          });
          await new Promise(fulfill => stream.on('finish', fulfill));
        } catch (err) {
          ws.close();
        }
      } catch (err) {
      }
    

    (I removed a lot of (error)-logging stuff, because of that the catch block is empty...)