I've got a REQ -> ROUTER -> [DEALER,DEALER... DEALER]
setup going where REQ is a client, ROUTER is a queue and the DEALER sockets are workers that process data and send it back to ROUTER which sends it back to REQ. Working fine when there are enough DEALERs to handle the work. But if I slow down the DEALERs the ROUTER will never tell me that it's getting more work than it can handle.
The docs say:
ROUTER sockets do have a somewhat brutal way of dealing with messages they can't send anywhere: they drop them silently. It's an attitude that makes sense in working code, but it makes debugging hard. The "send identity as first frame" approach is tricky enough that we often get this wrong when we're learning, and the ROUTER's stony silence when we mess up isn't very constructive.
Since ØMQ v3.2 there's a socket option you can set to catch this error: ZMQ_ROUTER_MANDATORY. Set that on the ROUTER socket and then when you provide an unroutable identity on a send call, the socket will signal an EHOSTUNREACH error.
I'm honestly not sure if that's the same problem that I'm seeing. Stony silence sure matches what I'm seeing.
Here's the code for the setup:
var argsToString, buildSocket, client, q;
buildSocket = function(desc, socketType, port) {
var socket;
log("creating socket: " + (argsToString(Array.apply(null, arguments))));
socket = zmq.socket(socketType);
socket.identity = "" + desc + "-" + socketType + "-" + process.pid + "-" + port;
return socket;
};
argsToString = function(a) {
return a.join(', ');
};
client = buildSocket("client", 'req', clientPort);
q = buildSocket("q", "router", qPort);
q.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1);
q.on('error', function() {
return log('router error ' + argsToString(Array.apply(null, arguments)));
});
I can post more code if needed. The issue is that when the REQ socket sends 10 messages in a second but the DEALERs take 2 seconds to do their work the ROUTER just ignores incoming messages, regardless of ZMQ_ROUTER_MANDATORY. I've sent 1000s of messages and never seen an error (.on 'error'
) thrown from any of the sockets.
There's talk of ZMQ_HWM out there, but the node driver doesn't seem to support it for DEALERs or ROUTERs.
How can I manage a ROUTER that runs out of places to send messages to?
First of all, if you're implementing a particular pattern (as I know, from your previous question, that you're implementing Paranoid Pirate), then it's always helpful to say that, since it will provide context for your code.
What you're talking about is specifically not addressed in Paranoid Pirate. You can see this by skipping down the guide to the Titanic pattern... when you're dealing with sporadic connectivity issues or, in your case, sporadic availability because your workers are still working as new messages arrive, you have to maintain the state of your workers in your queue to know what you need to do with that message... either send it to an available worker, or store it somewhere so that when a worker becomes available, you can pull it out and send it.
If you do this as strictly as possible, you're subverting the "queue" nature of ZMQ, but you avoid the uncertainty inherent in the HWM that will drop messages, rather than crash your system.
You could maintain a buffer, keep on adding messages to the queue until you recognize that you're, say, 40% into the HWM (which is dependent on the size of your messages)... that will give you a buffer before you have to start saving the messages, but in the end the process is the same.
This is an area where ZMQ offloads responsibility to the application designer, because there's no single "right" way to do things for all scenarios.
EDIT in response to comment:
Here's the basic gist of how I would handle this in node.js:
var worker_count = 0;
var job_count = 0;
// ...
q.on('message', function() {
// ...
if (msg.toString() == 'ready') worker_count++;
else job_count--;
// ...
// this could use some TLC, but here's the basic gist of the logic...
if (job_count >= worker_count) {
// we'll assume the message was cached when it was received from
// the req socket, if so nothing else to do here
}
else {
// figure out if there is a cached message ready to go, if so, then...
q.send(job);
job_count++;
}
});