I am fairly new with using Bluebird promises. I was trying to use them over an emitter. However, I am stuck on how to handle errors.
I have a stream
object which is the emitter. Code is as below -
return new Promise((resolve, reject) => {
var onDocFunc = doc => {
//JSON.parse('*');
// some logic goes in here to construct setStmt
bulk.find(query).upsert().update({$set: setStmt});
count++;
if (count % bulkSize == 0) {
stream.pause();
var execute = Promise.promisify(bulk.execute);
execute().catch(() => {}).then(() => {
stream.resume();
});
}
};
stream.on('data', onDocFunc);
stream.on('end', () => {
JSON.parse('*'); // how to catch errors that happen here??
var boundResolve = resolve.bind(this, {count: count});
if (count % bulkSize != 0) {
Promise.promisify(bulk.execute)().then(boundResolve).catch(boundResolve);
}
else {
boundResolve();
}
});
stream.on('error', err => {
reject(err);
});
})
I want to know what is the recommended way to catch an error which occurs inside the callback of the end
event handler? Right now if any error occurs, the NodeJS application crashes with uncaughtException: Unexpected token *
Don't mix application logic into the promisification of the event emitter. Such code (that can throw etc) should always go in then
callbacks. In your case:
var execute = Promise.promisify(bulk.execute);
return new Promise((resolve, reject) => {
stream.on('data', onDocFunc); // not sure what this does
stream.on('end', resolve);
stream.on('error', reject);
}).then(() => {
JSON.parse('*'); // exceptions that happen here are caught implicitly!
var result = {count: count};
if (count % bulkSize != 0) {
return execute().catch(()=>{}).return(result);
} else {
return result;
}
});
Regarding your real code, I'd probably try to factor out the batching into a helper function:
function asyncBatch(stream, size, callback) {
var batch = [], count = 0;
stream.on('data', data => {
batch.push(data);
count++;
if (batch.length == size) {
stream.pause();
Promise.resolve(batch).then(callback).then(() => {
batch = [];
stream.resume();
}, e => {
stream.emit('error', e);
});
}
});
return new Promise((resolve, reject) => {
stream.on('end', resolve);
stream.on('error', reject);
}).then(() => batch.length ? callback(batch) : null).then(() => count);
}
Promise.promisifyAll(Bulk);
return asyncBatch(stream, bulkSize, docs => {
const bulk = new Bulk()
for (const doc of docs) {
// JSON.parse('*');
// some logic goes in here to construct setStmt
bulk.find(query).upsert().update({$set: setStmt});
}
return bulk.executeAsync().catch(err => {/* ignore */});
})