I'am trying to implement a lazy subscriber on zeromq from the example wuclient/wuserver. The client is way slower than the server, so it must get only the last sent message by the server.
So far the only way i've found to do that, is by connecting/disconnecting the client, but there is of course an unwanted cost at each connection, around 3ms:
server.cxx
int main () {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int counter = 0;
while (1) {
counter++;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%d", counter);
publisher.send(message);
std::cout << counter << std::endl;
usleep(100000);
}
return 0;
}
client.cxx
int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
while(1){
zmq::message_t update;
int counter;
subscriber.connect("tcp://localhost:5556"); // This call take some milliseconds
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber.recv(&update);
subscriber.disconnect("tcp://localhost:5556");
std::istringstream iss(static_cast<char*>(update.data()));
iss >> counter;
std::cout << counter << std::endl;
usleep(1000000);
}
return 0;
}
Server output: 1 2 3 4 5 6 7 8 9 ...
Client output: 4 14 24 ...
I've tried to use high water mark to do that without co/deco, but it is not working. Even with this kind of code, frame begin to be dropped only when the buffer reach at least hundreds of messages. :
int high_water_mark = 1;
socket.setsockopt(ZMQ_RCVHWM, &high_water_mark, sizeof(high_water_mark) );
socket.setsockopt(ZMQ_SNDHWM, &high_water_mark, sizeof(high_water_mark) );
Also there is this post in zeromq-dev which is closely related, but the solution provided ( use of another thread to select the last message is not acceptable, I can't transfer tons of message over the network wich will not be used after.
The solution is to use ZMQ_CONFLATE like this ( only with non multipart messages ):
client.cxx
#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <unistd.h>
int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
int conflate = 1;
subscriber.setsockopt(ZMQ_CONFLATE, &conflate, sizeof(conflate) );
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
while(1){
zmq::message_t update;
int counter;
subscriber.recv(&update);
std::istringstream iss(static_cast<char*>(update.data()));
iss >> counter;
std::cout << counter << std::endl;
usleep(1000000);
}
return 0;
}