Search code examples
c++csocketstimerzeromq

Using timer with zmq


I am working on a project where I have to use zmq_poll. But I did not completely understand what it does.

So I also tried to implement it:

zmq_pollitem_t timer_open(void){

  zmq_pollitem_t items[1];


    if( items[0].socket  == nullptr ){
         printf("error socket %s: %s\n", zmq_strerror(zmq_errno()));
         return;        
    }
else{
    items[0].socket = gsock; 
} 


items[0].fd = -1;   
items[0].events = ZMQ_POLLIN;  


 // get a timer
items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
    printf("timerfd_create() failed: errno=%d\n", errno);
            items[0].socket  = nullptr;

            return;
    }

int rc = zmq_poll(items,1,-1);

if(rc == -1){
    printf("error poll %s: %s\n", zmq_strerror(zmq_errno()));
    return;
} 
else
     return items[0];
}

I am very new to this topic and I have to modify an old existing project and replace the functions with the one of zmq. On other websites I saw examples where they used two items and the zmq_poll function in an endless loop. I have read the documentation but still could not properly understand how this works. And these are the other two functions I have implemented. I do not know if it is the correct way to implement it like this:

   void timer_set(zmq_pollitem_t items[] , long msec, ipc_timer_mode_t mode ) {


    struct itimerspec t;

    ...

    timerfd_settime( items[0].fd , 0, &t, NULL );

}


void timer_close(zmq_pollitem_t items[]){

if( items[0].fd != -1 )
       close(items[0].fd);

items[0].socket = nullptr; 

}

I am not sure if I need the zmq_poll function because I am using a timer.

EDIT:

void some_function_timer_example() {
   // We want to wait on two timers
   zmq_pollitem_t items[2] ;

   // Setup first timer
   ipc_timer_open_(&items[0]);
   ipc_timer_set_(&items[0], 1000, IPC_TIMER_ONE_SHOT);
   // Setup second timer
   ipc_timer_open_(&items[1]);
   ipc_timer_set_(&items[1], 1000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
        //ipc_timer_set_(&items[0], 1000, IPC_TIMER_REPEAT);
        //ipc_timer_set_(&items[1], 5000, IPC_TIMER_REPEAT);

      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */

        if (items [0].revents & ZMQ_POLLIN) {
            //  Process task
            std::cout << "revents: 1" << std::endl;
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  Process weather update

            std::cout << "revents: 2" << std::endl;

        }
   }
}

Now it still prins very fast and is not waiting. It is still waiting only in the beginning. And when the timer_set is inside the loop it waits properly, only if the waiting time is the same like: ipc_timer_set(&items[1], 1000,...) and ipctimer_set(&items[0], 1000,...)

So how do I have to change this? Or is this the correct behavior?


Solution

  • zmq_poll works like select, but it allows some additional stuff. For instance you can select between regular synchronous file descriptors, and also special async sockets.

    In your case you can use the timer fd as you have tried to do, but you need to make a few small changes.

    First you have to consider how you will invoke these timers. I think the use case is if you want to create multiple timers and wait for them. This would be typically the function in yuor current code that might be using a loop for the timer (either using select() or whatever else they might be doing). It would be something like this:

    void some_function() {
       // We want to wait on two timers
       zmq_pollitem items[2];
    
       // Setup first timer
       ipc_timer_open(&item[0]);
       ipc_timer_set(&item[0], 1000, IPC_TIMER_ONE_REPEAT);
       // Setup second timer
       ipc_timer_open(&item[1]);
       ipc_timer_set(&item[1], 5000, IPC_TIMER_ONE_SHOT);
    
       // Now wait for the timers in a loop
       while (1) {
          int rc = zmq_poll (items, 2, -1);
          assert (rc >= 0); /* Returned events will be stored in items[].revents */
       }
    }
    

    Now, you need to fix the ipc_timer_open. It will be very simple - just create the timer fd.

    // Takes a pointer to pre-allocated zmq_pollitem_t and returns 0 for success, -1 for error
    int ipc_timer_open(zmq_pollitem_t *items){
        items[0].socket = NULL; 
        items[0].events = ZMQ_POLLIN;  
        // get a timer
        items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
        if( items[0].fd  == -1 )
        {
            printf("timerfd_create() failed: errno=%d\n", errno);
            return -1; // error
        }
        return 0;
    }
    

    Edit: Added as reply to comment, since this is long: From the documentation: If both socket and fd are set in a single zmq_pollitem_t, the ØMQ socket referenced by socket shall take precedence and the value of fd shall be ignored.

    So if you are passing the fd, you have to set socket to NULL. I am not even clear where gsock is coming from. Is this in the documentation? I couldn't find it.

    And when will it break out of the while(1) loop?

    This is application logic, and you have to code according to what you require. zmq_poll just keeps returning everytime one of the timer hits. In this example, every second the zmq_poll returns because the first timer (which is a repeat) keeps triggering. But at 5 seconds, it will also return because of the second timer (which is a one shot). Its up to you to decide when you exit the loop. Do you want this to go infinitely? Do you need to check for a different condition to exit the loop? Do you want to do this for say 100 times and then return? You can code whatever logic you want on top of this code.

    And what kind of events are returned back

    ZMQ_POLLIN since timer fds behave like readable file descriptors.