I have a small software where I have a separate thread which is waiting for ZeroMQ messages. I am using the PUB
/SUB
communication protocol of ZeroMQ.
Currently I am aborting that thread by setting a variable "cont_loop
" to False
.
But I discovered that, when no messages arrive to the ZeroMQ subscriber I cannot exit the thread (without taking down the whole program).
def __init__(self):
Thread.__init__(self)
self.cont_loop = True
def abort(self):
self.continue_loop = False
def run(self):
zmq_context = zmq.Context()
zmq_socket = zmq_context.socket(zmq.SUB)
zmq_socket.bind("tcp://*:%s" % *(5556))
zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
while self.cont_loop:
data = zmq_socket.recv()
print "Message: " + data
zmq_socket.close()
zmq_context.term()
print "exit"
I tried to move socket.close()
and context.term()
to abort-method. So that it shuts down the subscriber but this killed the whole program.
What is the correct way to shut down the above program?
A: There are many ways to achieve the set goal. Let me pick just one, as a mock-up example on how to handle distributed process-to-process messaging.
First. Assume, there are more priorities in typical software design task. Some higher, some lower, some even so low, that one can defer an execution of these low-priority sub-tasks, so that there remains more time in the scheduler, to execute those sub-tasks, that cannot handle waiting.
This said, let's view your code. The SUB
-side instruction to .recv()
as was being used, causes two things. One visible - it performs a RECEIVE operation on a ZeroMQ-socket with a SUB
-behaviour. The second, lesser visible is, it remains hanging, until it gets something "compatible" with a current state of the SUB
-behaviour ( more on setting this later ).
This means, it also BLOCKS all the time since such .recv()
method call UNTIL some unknown, locally uncontrollable coincidence of states/events makes it to deliver a ZeroMQ-message, with it's content being "compatible" with the locally pre-set state of this (still blocking) SUB
-behaviour instance.
That may take ages.
This is exactly why .recv()
is being rather used inside a control-loop, where external handling gets both the chance & the responsibility to do what you want ( including abort-related operations & a fair / graceful termination with proper resources' release(s) ).
Receive process becomes .recv( flags = zmq.NOBLOCK )
in rather a try: except:
episode. Such a way your local process does not lose it's control over the stream-of-events ( incl. the NOP being one such ).
Take your time and get through a great book of gems, "Code Connected, Volume 1", Pieter HINTJENS, co-father of the ZeroMQ, has published ( also as PDF ).
Many his thoughts & errors to be avoided that he had shared with us is indeed worth your time.
Enjoy the powers of ZeroMQ. It's very powerful & worth getting mastered top-down.