I have made small reasearch about patterns supported by zeromq. I would like to describe problem with PUB/SUB pattern, but probably I discover this problem in my recent project also in PUSH/PULL pattern. I use NodeJS zeromq implementation.
I prepare two examples (server.js & client.js). I recognized that first message from server.js is lost every time I restart server (message is send every 1 second). client.js doesn't get first message. It is probably caused by to short time before sending messages. When I start sending messages after some time (e.g. 1 second) everything works fine. I thing that zmq needs some time for initialization connection between publisher and subscriber.
I would like to know when producer (server) is ready to sending messages for subscribed clients. How get this information?
I don't understand why client.js connected and subscribed for messages doesn't get it, because server is not ready for support subscriptions after restart.
Maybe it works like this by design.
server.js:
var zmq = require('zmq');
console.log('server zmq: ' + zmq.version);
var publisher = zmq.socket('pub');
publisher.bindSync("tcp://*:5555");
var i = 0;
var msg = "get_status OK ";
function sendMsg () {
console.log(msg + i);
publisher.send(msg + i);
i++;
setTimeout(sendMsg, 1000);
}
sendMsg();
process.on('SIGINT', function() {
publisher.close();
process.exit();
});
client.js:
var zmq = require('zmq');
console.log('client zmq: ' + zmq.version);
var subscriber = zmq.socket('sub');
subscriber.subscribe("get_status");
subscriber.on('message', function(data) {
console.log(data.toString());
});
subscriber.connect("tcp://127.0.0.1:5555");
process.on('SIGINT', function() {
subscriber.close();
process.exit();
});
In the node zmq lib repo you have stated the supported monitoring events. Subscribing to this will allow you to monitor your connection, in this case the accept
event. However don't forget that you'll also have to call the monitor()
function on the socket to activate monitoring.
You should end up with something like:
var publisher = zmq.socket('pub');
publisher.on('accept', function(fd, ep) {
sendMsg();
});
publisher.monitor(100, 0);
publisher.bindSync("tcp://*:5555");