Search code examples
node.jsasync-awaitstreamfor-await

Node.js await .pipe()


I'm trying to read a lot of files and combine the information from them into one. Here is the implementation that I got. I want to rewrite it to .pipe(), but it only turns out to overwrite the file at each iteration. I'm asking for help.

    async joinSlices(path) {
        const wrs = new WriteStream('./bigJSON.json');
        const files = fs.readdirSync(path);
        for (let file of files) {
            console.log(file + ' start');
            await new Promise((res, rej) => {
                const rs = new ReadStream(path + file, 'utf-8');
                rs.on('data', (chunk) => { wrs.write(chunk); });
                rs.on('end', (data) => { console.log(file + ' end'); });
                rs.on('close', (data) => { res(); });
            });
        }
        wrs.destroy();
        console.log('write stream end');
    }

The cycle starts if both streams are declared in its body and a promise is placed at the end of recording, but overwriting occurs at each iteration.


Solution

  • The Node.js-specific Stream API is a bit of a pain to work with. Luckily, the streams are also instances of the standardized Iteration_protocols (that are also implemented in browsers), which means you can use for await of as follows.

    import fs from 'node:fs'
    
    const inDir = 'in/'
    const outFile = fs.createWriteStream('./out.json')
    
    outFile.write('[')
    let first = true
    for (const fileName of fs.readdirSync(inDir)) {
      if (first) {
        first = false
      } else {
        outFile.write(',')
      }
    
      const file = fs.createReadStream(inDir + fileName, { encoding: 'utf-8' })
      for await (const str of file) {
        outFile.write(str)
      }
    }
    outFile.write(']')
    
    outFile.close()
    

    Or if you prefer a more functional style (with pipeline):

    import fs from 'node:fs'
    import { pipeline } from 'node:stream/promises'
    
    const inDir = 'in/'
    const outFile = fs.createWriteStream('./out.json')
    
    const iter =
      concatIterable(
        '[',
        joinIterable(
          flatMapIterable(fs.readdirSync(inDir), fileName =>
            fs.createReadStream(inDir + fileName, { encoding: 'utf-8' })
          ),
          ','
        ),
        ']',
      )
    
    pipeline(iter, outFile).catch(console.error)
    
    // Until https://github.com/tc39/proposal-async-iterator-helpers/ has landed,
    // we define ourselves some helpers for Iterables, just like we have for Arrays.
    
    async function * concatIterable (...iters) {
      for (const iter of iters) {
        yield * iter
      }
    }
    
    async function * flatMapIterable (iter, callback) {
      let i = 0
      for await (const val of iter) {
        yield * callback(val, i++)
      }
    }
    
    async function * joinIterable (iter, sep) {
      let first = true
      for await (const value of iter) {
        if (first) {
          first = false
        } else {
          yield sep
        }
    
        yield value
      }
    }