I have the following nodejs program to retrieve a large number of website urls from an SQS queue and screenshot them with wkhtmltoimage, 15 at a time:
var concurrency=15;
//break down queue size into batches of concurrency size
var getTotalNumberMessages = sqs.getQueueAttributesAsync({
QueueUrl: queueUrl,
AttributeNames: ['All']
}).then(function(data) {
var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));
Promise.each(total, function (value, index, length) {
var toDo = Array(Math.floor(concurrency / 10) + 1);
messages = [];
var gmPromises = [];
Promise.each(toDo, function (value, index, length) {
gmPromises.push(
sqs.receiveMessageAsync({
QueueUrl: queueUrl,
WaitTimeSeconds: 20,
VisibilityTimeout: 120,
MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
}).then(function (data) {
if (data.Messages.length == 0) {
done = true;
} else {
messages = messages.concat(data.Messages);
}
})
);
}).then(function() {
Promise.all(gmPromises).then(function () {
var promises = [];
Promise.map(messages, function (message) {
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
console.log(data.toString());
});
process.stderr.on('data', function (err) {
console.log(err.toString());
});
return new Promise(function (resolve, reject) {
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'mybucket',
Key: s3key,
Body: fileData,
ACL: 'public-read'
}, function (err, resp) {
var deleteMessagePromise = sqs.deleteMessage({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
deleteMessagePromise.catch(function (err) {
console.log('SQS deleteMessage failed: ', err, err.stack);
});
promises.push(deleteMessagePromise);
console.log(arguments);
console.log('Successfully uploaded package.');
resolve();
});
}
}
);
}
});
});
}, {
concurrency: 15
});
return Promise.all(promises);
});
});
});
However I am finding there are way morethan 15 wkhtmltoimage running in parallel. It seems the retrieving of batches of 15 messages from SQS is parallelized, even though I am using Bluebird's Promise.each?
In my code above, what I was doing is
Because all relevant code uses asynchronous calls, all items in my queue would be processed in parallel, which is not the intention of the code. Instead I wanted to limit concurrency, so I'm only using x amounts of memory used by wkhtmltoimage
What I needed was to receive 15 items from my queue and then stop receiving items until some of my wkhtml processes is finished. Then I can receive items until I have 15 running processes again.
After quite a bit of research I found this is what node.js streams do when in paused mode. I also found a node.js package that wraps SQS queues into a stream: sqs-readable-stream
Here is my final code:
var sqsStream = new SQSReadableStream({
sqsClient: sqs,
queueUrl: queueUrl
});
var collect = [];
sqsStream.on('data', function(message){
collect.push(message);
if (collect.length >= concurrency){
sqsStream.pause();
}
var body = JSON.parse(message.Body);
var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
//console.log(data.toString());
});
process.stderr.on('data', function (err) {
//console.log(err.toString());
});
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'somebucket',
Key: s3key,
Body: fileData
}, function (err, resp) {
message.deleteMessage();
console.log(arguments);
console.log('Successfully uploaded package.');
collect.pop();
sqsStream.resume()
});
}
}
);
}
});
});