Search code examples
node.jsbluebird

wkhtmltoimage with SQS, Bluebird promise, 15 at a time


I have the following nodejs program to retrieve a large number of website urls from an SQS queue and screenshot them with wkhtmltoimage, 15 at a time:

    var concurrency=15;
    //break down queue size into batches of concurrency size

    var getTotalNumberMessages = sqs.getQueueAttributesAsync({
        QueueUrl: queueUrl,
        AttributeNames: ['All']
    }).then(function(data) {

        var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));

        Promise.each(total, function (value, index, length) {

            var toDo = Array(Math.floor(concurrency / 10) + 1);
            messages = [];
            var gmPromises = [];
            Promise.each(toDo, function (value, index, length) {
                gmPromises.push(
                    sqs.receiveMessageAsync({
                        QueueUrl: queueUrl,
                        WaitTimeSeconds: 20,
                        VisibilityTimeout: 120, 
                        MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
                    }).then(function (data) {
                        if (data.Messages.length == 0) {
                            done = true;
                        } else {
                            messages = messages.concat(data.Messages);
                        }
                    })
                );
            }).then(function() {
                Promise.all(gmPromises).then(function () {

                    var promises = [];

                    Promise.map(messages, function (message) {

                        var tmpFilename = '/tmp/' + md5(s3key) + '.png';

                        var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);

                        console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
                        process.stdout.on('data', function (data) {
                            console.log(data.toString());
                        });

                        process.stderr.on('data', function (err) {
                            console.log(err.toString());
                        });

                        return new Promise(function (resolve, reject) {
                            process.on('exit', function (code) {
                                if (code == 0) {
                                    fs.readFile(tmpFilename, {}, function (err, data) {
                                            if (err) {
                                                throw err;
                                            } else {
                                                var fileData = Buffer.from(data, 'binary');

                                                var s3 = new AWS.S3();
                                                s3.putObject({
                                                    Bucket: 'mybucket',
                                                    Key: s3key,
                                                    Body: fileData,
                                                    ACL: 'public-read'
                                                }, function (err, resp) {
                                                    var deleteMessagePromise = sqs.deleteMessage({
                                                        QueueUrl: queueUrl,
                                                        ReceiptHandle: message.ReceiptHandle
                                                    }).promise();
                                                    deleteMessagePromise.catch(function (err) {
                                                        console.log('SQS deleteMessage failed: ', err, err.stack);
                                                    });
                                                    promises.push(deleteMessagePromise);

                                                    console.log(arguments);
                                                    console.log('Successfully uploaded package.');
                                                    resolve();
                                                });
                                            }
                                        }
                                    );
                                }
                            });
                        });
                    }, {
                        concurrency: 15
                    });


                    return Promise.all(promises);
                });
            });
        });

However I am finding there are way morethan 15 wkhtmltoimage running in parallel. It seems the retrieving of batches of 15 messages from SQS is parallelized, even though I am using Bluebird's Promise.each?


Solution

  • In my code above, what I was doing is

    • find out how many items are in my queue
    • divide by the number of concurrently processed items -> number of groups
    • for each group
      • process them 15 at a time
      • loop to the next group

    Because all relevant code uses asynchronous calls, all items in my queue would be processed in parallel, which is not the intention of the code. Instead I wanted to limit concurrency, so I'm only using x amounts of memory used by wkhtmltoimage

    What I needed was to receive 15 items from my queue and then stop receiving items until some of my wkhtml processes is finished. Then I can receive items until I have 15 running processes again.

    After quite a bit of research I found this is what node.js streams do when in paused mode. I also found a node.js package that wraps SQS queues into a stream: sqs-readable-stream

    Here is my final code:

        var sqsStream = new SQSReadableStream({
            sqsClient: sqs,
            queueUrl: queueUrl
        });
    
        var collect = [];
    
        sqsStream.on('data', function(message){
    
            collect.push(message);
    
            if (collect.length >= concurrency){
                sqsStream.pause();
            }
    
            var body = JSON.parse(message.Body);
    
            var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';
    
            var tmpFilename = '/tmp/' + md5(s3key) + '.png';
    
            var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
    
            console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
            process.stdout.on('data', function (data) {
                //console.log(data.toString());
            });
    
            process.stderr.on('data', function (err) {
                //console.log(err.toString());
            });
    
            process.on('exit', function (code) {
                if (code == 0) {
                    fs.readFile(tmpFilename, {}, function (err, data) {
                            if (err) {
                                throw err;
                            } else {
                                var fileData = Buffer.from(data, 'binary');
    
                                var s3 = new AWS.S3();
                                s3.putObject({
                                    Bucket: 'somebucket',
                                    Key: s3key,
                                    Body: fileData
                                }, function (err, resp) {
                                    message.deleteMessage();
                                    console.log(arguments);
                                    console.log('Successfully uploaded package.');
                                    collect.pop();
                                    sqsStream.resume()
                                });
                            }
                        }
                    );
                }
            });
    
        });