Search code examples
node.jsamazon-s3aws-lambdanode-streamscsvtojson

Convert a CSV file to multiple small JSON files in lambda through streams


I am trying to write a lambda function that can convert stream a huge csv file to multiple small json files (say a json file for 2000 rows) from and to a s3 bucket. I though have some restrictions like running in a limited RAM memory of 256 MB.

I am able to do the same by getting the file as file instead of stream like below.

But due to memory constraints i need to handle this in streams. Is there a way to do the same using streams?

// transformationClass.js

const csv = require('csvtojson');

const extension = '.json';
class S3CsvToJson {
    static async perform(input, output, headers) {
        let jsonArray = null;
        const s3Object = await s3.getObject(); // getting the s3 object
        const csvString = s3Object.Body.toString('utf8');
        await csv({
            noheader: false,
        })
            .fromString(csvString)
            .then((csvRow) => {
                jsonArray = csvRow;
            });
        const fileNames = await S3CsvToJson.writeToFile(jsonArray, output);
        return { files: fileNames };
    }

    static async writeToFile(jsonArray, output) {
        const minNumber = 0;
        const maxNumber = 1999;
        const fileNames = [];
        let outFile;
        if (jsonArray && Array.isArray(jsonArray)) {
            let fileIterator = 1;
            while (jsonArray.length) {
                outFile = `${output.key}-${fileIterator}${extension}`;
                await // s3.putObject(). writing to s3
              .putObject(
                    outFile,
                    output.bucketName,
                    JSON.stringify(jsonArray.splice(minNumber, maxNumber)),
                );
                console.log('rows left :', jsonArray.length);
                fileNames.push(outFile);
                fileIterator += 1;
            }
        }
        return fileNames;
    }
}

module.exports = S3CsvToJson;

here is the handler function

// handler.js
module.exports.perform = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result));
    console.log('leaving - ', Date.now());
};

Thanks in advance!!


Solution

  • After going through a lot of stuffs, I finally derived a way to get that done.

    What I had to do was, wrap the whole process into a promise and return it. I created a read stream from the s3, forwarded it to the parser, and then to the write stream. I wished to share it here, so it could be useful for others. Also open for any better optimized solutions.

    // transformationClass.js
    
    const csv = require('fast-csv');
    const { Transform, pipeline } = require('stream');
    
    const extension = '.json';
    class S3CsvToJson {
      static async perform(input, output, headers) {
        console.log(input, output, headers);
        const threshold = 2000;
        try {
          const promiseTransformData = () => new Promise((resolve, reject) => {
            try {
              let jsonOutput = [];
              let fileCounter = 0;
              const fileNames = [];
              const writableStream = new Transform({
                objectMode: true,
                autoDestroy: true,
                async transform(data, _, next) {
                  if (jsonOutput.length === threshold) {
                    fileCounter += 1;
                    const fileUpload = new Promise((resolveWriter) => {
                      s3
                        .putObject(
                          `${output.key}-${fileCounter}${extension}`,
                          output.bucketName,
                          JSON.stringify(jsonOutput),
                        )
                        .then(() => resolveWriter());
                    });
                    await fileUpload;
                    fileNames.push(`${output.key}-${fileCounter}${extension}`);
                    jsonOutput = [];
                  }
                  jsonOutput.push(data);
                  next();
                },
              });
              const readFileStream = s3.getReadStream(input.key, input.bucketName);
              pipeline(
                readFileStream,
                csv.parse({ headers: true }),
                writableStream,
                (error) => {
                  // if (err) throw new Error('Pipeline error');
                  if (error) {
                    console.error(`Error occurred in pipeline - ${error}`);
                    resolve({ errorMessage: error.message });
                  }
                },
              );
              writableStream.on('finish', async () => {
                if (jsonOutput.length) {
                  fileCounter += 1;
                  const fileUpload = new Promise((resolveWriter) => {
                    s3
                      .putObject(
                        `${output.key}-${fileCounter}${extension}`,
                        output.bucketName,
                        JSON.stringify(jsonOutput),
                      )
                      .then(() => resolveWriter());
                  });
                  await fileUpload;
                  fileNames.push(`${output.key}-${fileCounter}${extension}`);
                  jsonOutput = [];
                }
                console.log({ status: 'Success', files: fileNames });
                resolve({ status: 'Success', files: fileNames });
              });
            } catch (error) {
              console.error(`Error occurred while transformation - ${error}`);
              resolve({ errorMessage: error ? error.message : null });
            }
          });
          return await promiseTransformData();
        } catch (error) {
          return error.message || error;
        }
      }
    }
    
    module.exports = S3CsvToJson;
    

    And for the handler i call the S3CsvToJson like this

    // handler.js
    module.exports.perform = async (event, context, callback) => {
        context.callbackWaitsForEmptyEventLoop = false;
        await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
            .then((result) => callback(null, result))
            .catch((error) => callback(error));
    };
    

    Hope it was helpful. Thanks!