Search code examples
zeromq

ZeroMQ High Water Mark Doesn't Work


when I read the "Durable Subscribers and High-Water Marks" in zmq guide, it said "The HWM causes ØMQ to drop messages it can't put onto the queue", but no messages lost when I ran the example. Hit ctrl+c to terminate the durasub.py and then continue it.

the example from the zmq in python.Other languages are the same.

durasub.py

import zmq                                              
import time                                             

context = zmq.Context()                                 


subscriber = context.socket(zmq.SUB)                    
subscriber.setsockopt(zmq.IDENTITY, "Hello")            
subscriber.setsockopt(zmq.SUBSCRIBE, "")                
subscriber.connect("tcp://localhost:5565")              


sync = context.socket(zmq.PUSH)                         
sync.connect("tcp://localhost:5564")                    
sync.send("")                                           


while True:                                             
    data = subscriber.recv()                            
    print data                                          
    if data == "END":                                   
        break                                           

durapub.py

import zmq                                        
import time                                       

context = zmq.Context()                           


sync = context.socket(zmq.PULL)                   
sync.bind("tcp://*:5564")                         

publisher = context.socket(zmq.PUB)               
publisher.bind("tcp://*:5565")                    

publisher.setsockopt(zmq.HWM, 2)                  

sync_request = sync.recv()                        

for n in xrange(10):                              
    msg = "Update %d" % n                         
    publisher.send(msg)                           
    time.sleep(1)                                 

publisher.send("END")                             

Solution

  • The suggestion above is valid, but doesn't properly address the problem in this particular code.

    The real problem here is that in durapub.py you call publisher.setsockopt(zmq.HWM, 2) AFTER calling publisher.bind. You should call setsockopt BEFORE bind or connect.

    Please refer to 0MQ API documentation for setsockopt:

    Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE and ZMQ_LINGER, only take effect for subsequent socket bind/connects.