ORIGINAL POST: I'm writing a service in C programming using libevent and zmq. Msg is pushed from python code to C service using PUSH-PULL pattern.
fd received from zmq socket:
void *receiver = zmq_socket (base.zmq_ctx, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
int fd=0;
size_t fd_len = sizeof(fd);
zmq_getsockopt (receiver, ZMQ_FD, &fd, &fd_len);
Using Libevent, event registered with fd for persistent read
struct event *read_data_on_zmq =event_new(base.evbase, fd, EV_READ | EV_PERSIST , read_data_on_zmq_cb,receiver);
event_add(read_data_on_zmq,NULL);
event_base_dispatch(base.evbase);
On the callback method I'm doing a non-blocking receive
void read_data_on_zmq_cb(evutil_socket_t fd, short what, void *arg)
{
char *msg = calloc(1024,sizeof(char));
int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
if (size != -1)
{
puts ("is size is not -1");
printf("msg = %s\n",msg);
}
}
In the python code I'm continuously sending message to the socket.
import zmq
import time
c=zmq.Context()
s=c.socket(zmq.PUSH)
s.bind('tcp://127.0.0.1:5557')
while(True):
s.send("abc")
time.sleep(2)
The problem is I'm only able to receive the message once, after that the event callback never gets hit. If I do zmq_connect inside the read_data_on_zmq_cb after zmq_recv, then it works fine, but I guess that is redundant and not the correct way to do it. What is the problem here?
EDIT1: In addition to checking ZMQ_EVENTS after doing zmq_recv(), you need to fetch all the message because zmq is EDGE triggered. A great explanation about EDGE trigered notification is here http://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ so ultimately my event callback would look like
void read_data_on_zmq_cb(evutil_socket_t fd, short what, void *arg)
{
unsigned int zmq_events;
size_t zmq_events_size = sizeof(zmq_events);
char *msg=NULL;
zmq_getsockopt (receiver, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
while(zmq_events & ZMQ_POLLIN)
{
msg = calloc(1024,sizeof(char));
int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
if (size != -1) {
#ifdef DEBUG
printf("msg = %s\n",msg);
#endif
//return msg;
}
zmq_getsockopt (receiver, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
}
}
PLEASE read my EDIT1 for complete answer with code. The problem was: I didn't recheck ZMQ_EVENTS after doing zmq_recv(), as the state of the socket changes at that time.
So calling
zmq_getsockopt (receiver, ZMQ_EVENTS, &fd, &fd_size);
after zmq_recv() solved my problem.