Search code examples
c++zeromqros

What is the proper way to shutdown a ZeroMQ Subscriber?


I am using the PUB/SUB model in ZeroMQ inside ROS.

The SUB-subscriber is allowed to stop just by pressing a Ctrl+C in the terminal.

However, every time, when I actually press the Ctrl+C, it stops by throwing the following error:

^Cterminate called after throwing an instance of 'zmq::error_t'
  what():  Interrupted system call
Aborted (core dumped)

Below is the code snippet:

#include <ros/ros.h>
#include <zmq.hpp>

int main(int argc, char **argv)
{
  ros::init(argc, argv, "node_name", ros::init_options::AnonymousName);
  ros::NodeHandle nh;
  ros::Publisher pub;

  //  Prepare context and publisher
  zmq::context_t zmq_context(1);
  zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);
  zmq_socket.connect("tcp://192.168.1.20:9001");

  std::string TOPIC = "";
  zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages

  int timeout = 1000;  // Timeout to get out of the while loop since recv is blocking
  zmq_socket.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(timeout));

  while (ros::ok())
  {
      zmq::message_t msg;
      int rc = zmq_socket.recv(&msg);
      if (rc)
      {
          //receive data and prepare it for publishing
          pub.publish(data);
          ros::spinOnce();
      }
  }

  // Clean up the socket and context here
  zmq_socket.close();
  zmq_context.close();

  return 0;
}

How to avoid the error so as to shut down the subscriber properly?


Solution

  • I realized that the issue is caused by int rc = zmq_socket.recv(&msg) hence I did following improvements-

    1. I used try and catch block
    2. Set ZMQ_LINGER as suggested by @user3666197

    Below is the code snippet-

    int linger = 0; // Proper shutdown ZeroMQ
    zmq_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    
    std::string socket_address = "tcp://192.168.1.20:9001";
    zmq_socket.connect(socket_address.c_str());
    
    ros::Duration duration(0.1); // in seconds (100 ms)
    while (ros::ok())
    {
        zmq::message_t msg;
        int rc = 0;
        try
        {
            rc = zmq_socket.recv(&msg);
        }
        catch (zmq::error_t& e)
        {
            ROS_DEBUG_STREAM("ZMQ Error. " << e.what());
        }
        if (rc)
        {
            unsigned char* byte_ptr = static_cast<unsigned char*>(msg.data());
            const int msg_length = msg.size();
            // receive data and prepare it for publishing
            pub.publish(data);
            ros::spinOnce();
        }
        else
        {
            ROS_DEBUG_STREAM("Pointcloud recv() returned 0");
            duration.sleep();
        }
    }
    

    Thanks