Search code examples
amazon-s3aws-lambdastreamamazon-dynamodbfast-csv

stream s3 to dynamodb with fast-csv : not all data inserted


When a csv file is uploaded on my s3 bucket, my lambda will be triggered to insert my data into DynamoDB. I need a stream because the file is too large to be downloaded as full object.

const batchWrite = async (clientDynamoDB, itemsToProcess) => {
    const ri = {};
    ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
    const params = { RequestItems: ri };
    await clientDynamoDB.batchWriteItem(params).promise();
};

function runStreamPromiseAsync(stream, clientDynamoDB) {
    return new Promise((resolve, reject) => {
        const sizeChunk = 25;
        let itemsToProcess = [];

        stream
            .pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
            .on("data", (row) => {
                stream.pause();
                itemsToProcess.push(row);

                if (itemsToProcess.length === sizeChunk) {
                    batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
                        stream.resume();
                    });
                    itemsToProcess = [];
                }
            })
            .on("error", (err) => {
                console.log(err);
                reject("Error");
            })
            .on("end", () => {
                stream.pause();
                console.log("end");
                batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
                    resolve("OK");
                });
            });
    });
}

module.exports.main = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;

    const AWS = require('aws-sdk');
    const s3 = new AWS.S3();

    const object = event.Records[0].s3;
    const bucket = object.bucket.name;
    const file = object.object.key;

    const agent = new https.Agent({
        keepAlive: true
    });
    const client = new AWS.DynamoDB({
        httpOptions: {
            agent
        }
    });

    try {
        //get Stream csv data
        const stream = s3
            .getObject({
                Bucket: bucket,
                Key: file
            })
            .createReadStream()
            .on('error', (e) => {
                console.log(e);
            });

        await runStreamPromiseAsync(stream, client);
    } catch (e) {
        console.log(e);
    }
};

When my file is 1000 lines everything is inserted but when I have 5000 lines, my function insert only around 3000 lines and this number is random... Sometimes more sometimes less..

So I'd like to understand what am I missing here ?

I also read this article but to be honest even if you pause the second stream, the first one is still running.. So if someone have any ideas on how to do this, it would be greatly appreciated !

Thanks


Solution

  • I found out why It was not fully processed, it's because the callback of batchWriteItem can return unprocess Items. So I change the function batchWrite and also the runPromiseStreamAsync a little bit because i might not have all the items processed from itemsToProcess.

    Anyway here is the full code :

    const batchWrite = (client, itemsToProcess) => {
        const ri = {};
        ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
        const items = { RequestItems: ri };
        const processItemsCallback = function(err, data) {
            return new Promise((resolve, reject) => {
                if(!data || data.length === 0){
                    return resolve();
                }
                if(err){
                    return reject(err);
                }
                let params = {};
                params.RequestItems = data.UnprocessedItems;
                return client.batchWriteItem(params, processItemsCallback);
            });
        };
        return client.batchWriteItem(items, processItemsCallback );
    };
    
    function runStreamPromiseAsync(stream, clientDynamoDB) {
        return new Promise((resolve, reject) => {
            const sizeChunk = 25;
            let itemsToProcess = [];
            let arrayPromise = [];
    
            stream
                .pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
                .on("error", (err) => {
                    console.log(err);
                    reject("Error");
                })
                .on('data', data => {
                    itemsToProcess.push(data);
                    if(itemsToProcess.length === sizeChunk){
                        arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
                        itemsToProcess = [];
                    }
                })
                .on('end', () => {
                    if(itemsToProcess.length !== 0){
                        arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
                    }
                    resolve(Promise.all(arrayPromise).catch(e => {
                        reject(e)
                    }));
                });
        });
    }