Search code examples
c++zeromqros

Does a SUB subscriber in ZeroMQ have any "callback"-mechanism as in ROS?


I am a newbie in ZeroMQ.

I use ROS frequently. Hence I am confused by the subscriber in ZeroMQ. Most of the time in ROS, the subscriber is having a callback function, which is called automatically whenever data is available in the corresponding rostopic.

Please see below code snippet, borrowed from ROS wiki:

void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
  ROS_INFO("I heard: [%s]", msg->data.c_str());
}
//define subscriber and callback function associated with it
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);

However, in ZeroMQ, it seems that subscriber is kept in a loop to receive the data, as shown below:

for (int update_nbr = 0; update_nbr < 100; update_nbr++)
{
    zmq::message_t update;
    int zipcode, temperature, relhumidity;
    // receive the data
    subscriber.recv(&update);
    // do something with data
    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> zipcode >> temperature >> relhumidity;
}

The above code is borrowed from ZeroMQ wiki.

Does there exist any callback-mechanism, similar to ROS Subscriber, in ZeroMQ too?


Solution

  • No, there is no callback system in ZMQ. You have to call recv() function in order to receive a message.

    You can implement one using recv() since it blocks and returns a status, so you can use it in an if condition and a while loop.

    I often use a pattern like this one with a timeout :

    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);
    
    zmq_socket.connect("tcp://127.0.0.1:58951");
    
    std::string TOPIC = "";
    
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages
    zmq_socket.setsockopt(ZMQ_RCVTIMEO, 1000); // Timeout to get out of the while loop since recv is blocking
    while(run) {
        zmq::message_t msg;
        int rc = zmq_socket.recv(&msg);  // Is blocking until you receive a message
        if(rc) { 
            // You received a message : your data is in msg
            // Do stuff here : call your function with the parameters received from zmq
        }
    }
    // Clean up your socket and context here
    zmq_socket.setsockopt(ZMQ_LINGER, linger);
    zmq_socket.close();
    zmq_context.close();