I've got a problem with RabbitMQ in Node.js . I'm trying to implement a Pub/Sub connector in which each user has its own queue to poll to get the messages. When I publish a message via Postma and the user consumes it, I have no problem (I get the message correctly), while if the user tries to consume the message on the queue (but no messages are present), it gets stuck until a new publish is made, but I have no way to get the message. What I want to do is to abort the consume and re-try later. Could you help me with that?
get_queue works fine and gets the user's personal queue.
Route
app.post('/consume', (req, res) => {
project.get_queue(req, res)
.then(result1 => {return project.consume(req, res, result1.message);})
.then(result2 => {res.status(result2.status).json({ message: result2.message });})
.catch(err => res.status(err.status >= 100 && err.status < 600 ? err.status : 500).send({message: err.message}))
});
Controller
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.assertQueue(queue, {durable: true},function(err, _ok){
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
console.log("#Msg: ", _ok.messageCount)
if(_ok.messageCount === 0) {
reject({ status: 400, message: 'No consuming' });
}
else {
var json = [];
var topic = ch.consume(queue, function(msg, err) {
work(msg, function(ok) {
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
resolve({ status: 200, message: json});
}
else {
ch.reject(msg, true);
resolve({ status: 200, message: json});
}
} catch (e) {
reject({ status: 404, message: "niente da consumare"});
closeOnErr(e);
}});
}, {noAck: false});
})
})
};
This is what I did so far.
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
iterate(ch, queue, json);
})
})
};
function iterate(ch, queue, json) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
reject({
status: 404,
message: "niente da consumare"
});
closeOnErr(e);
}
});
})}
Using ch.get with an iterator :
exports.consume = function(req, res, user) {
return new Promise((resolve, reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
return iterate(ch, queue, json, resolve, reject);
});
});
};
function iterate(ch, queue, json, resolve, reject) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json, resolve, reject);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
closeOnErr(e);
return reject({
status: 404,
message: "niente da consumare"
});
}
});
});
}
It will run the function iterate()
, if msg
is false
, it will resolve, else it will handle the message, if result from work
is not ok, it will reject and stop iterating, if result is ok, it will iterate again;