Search code examples
pythonzeromqdistributed-computingpyzmq

PyZMQ req socket - hang on context.term()


Struggling to properly shut down a simple pyzmq based client when the server is not available. Below are 2 snippets.

First the server. This is more or less the pyzmq example. No special code here:

import zmq
import json

port = 5555

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))

while True:
    message = socket.recv_json()
    print(json.dumps(message))
    socket.send_json({'response': 'Hello'})

Next the client.

import zmq

ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)

if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
    socket.send_json(message, flags=zmq.NOBLOCK)        
    if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
        response = socket.recv_json()
        print(response)

socket.disconnect(addr)
socket.close(linger=0)
context.term()

Here I've tried to enhance the default client with the ability to timeout if the server is not available. The code below is using the poll method, although I've also tried with setting a receive timeout on the socket.

If the server is running, the client sends and receives a response and exits cleanly.

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call. My understanding, from searching is that this will hang if there are sockets that have not been closed, which doesn't seem to be the case.

Any help is much appreciated.


Solution

  • On "ability to timeout if the server is not available"

    Timeout is possible, yet that will not allow the hard-wired REQ/REP two-step dance to survive, the less to continue in a proper manner, if one side timeouts an otherwise mandatory step in a distributed Finite State Automaton scheme ( dFSA cannot take one-sided shortcuts, it is a dual-sided dFSA ).


    Hypothesis:

    If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call.

    Validation:

    Let's review the code in a step-by-step manner

    def  Test( SetImmediate = False ):
         ##################################################################################
         import zmq, json, time;                                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
         ##################################################################################
         ip      = 'localhost';                                                       print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
         port    =  5555;                                                             print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
         addr    = "tcp://{0}:{1}".format( ip, port );                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
         message = { 'value': 10 };                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
         ##################################################################################
         context = zmq.Context();                                                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
         pass;              aReqSock = context.socket( zmq.REQ );                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ),                        "|", zmq.strerror( zmq.zmq_errno() ) )
         ##################################################################################################################################################################################################################################
         pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
         pass;              aReqSock.setsockopt(       zmq.LINGER,    0 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER    ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
         pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
         ##################################################################################################################################################################################################################################
         pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
         if SetImmediate:
                            aReqSock.setsockopt(       zmq.IMMEDIATE, 1 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
         pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
         ##################################################################################################################################################################################################################################
         pass;              aReqSock.connect( addr );                                 print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ),                      "|", zmq.strerror( zmq.zmq_errno() ) )
         ##################################################################################
         pass;        rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ),   "|", zmq.strerror( zmq.zmq_errno() ) )
         if      0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
             pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... ==  " ), rc )
             pass;          aReqSock.send_json( message,   flags = zmq.NOBLOCK )      # .send()-s dispatches message the REP-side may .recv() at some later time
             pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ),            "|", zmq.strerror( zmq.zmq_errno() ) )
             pass;    rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ),    "|", zmq.strerror( zmq.zmq_errno() ) )
             if  0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
                 pass;                                                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
                 response = aReqSock.recv_json()                                      # .recv() BLOCKS until ... if ever ...
                 print( response );                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
         pass;                                                                        print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
         ##################################################################################
         rc = aReqSock.disconnect( addr );                                            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
         rc = aReqSock.close(      linger = 0 );                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
         rc = context.term();                                                         print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
         ##################################################################################
    

    This produces something about this:

    >>> Test( SetImmediate = False )
    ____947107.0356056700_ACK: import-s: DONE... VER:  4.2.5
    ____947107.0356727780_ACK: ip SET...
    ____947107.0356969039_ACK: port SET...
    ____947107.0357236000_ACK: addr SET...
    ____947107.0357460320_ACK: message SET...
    ____947107.0358552620_ACK: Context INSTANTIATED... | Success
    ____947107.0362445670_ACK: Socket INSTANTIATED... | Success
    ____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
    ____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
    ____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
    ____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
    ____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
    ____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
    ____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
    ____947107.0367464600_ACK: rc was NON-ZERO... ==   2
    ____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
    ____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
    ____947108.0382736750_ACK: if-ed code-block COMPLETED
    ____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
    ____947108.0384234400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
    ____947108.0386644470_ACK: Context.term() RETURNED CODE ~  None | Success
    

    and

    >>> Test( SetImmediate = True )
    ____947119.1267617550_ACK: import-s: DONE... VER:  4.2.5
    ____947119.1268189061_ACK: ip SET...
    ____947119.1268382660_ACK: port SET...
    ____947119.1268587380_ACK: addr SET...
    ____947119.1268772170_ACK: message SET...
    ____947119.1269678050_ACK: Context INSTANTIATED... | Success
    ____947119.1271884360_ACK: Socket INSTANTIATED... | Success
    ____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
    ____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
    ____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
    ____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
    ____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
    ____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
    ____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
    ____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
    ____947120.1287937190_ACK: if-ed code-block COMPLETED
    ____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
    ____947120.1289412400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
    ____947120.1291404651_ACK: Context.term() RETURNED CODE ~  None | Success
    

    Which proves the hypothesis not to be correct: there is no problem with the context.term(), but with a way, how the .connect( aTransportClass_Target ) against a target, which is not present is being internally handled.

    To my surprise, in the version under test ( v4.2.5 ) the .poll( zmq.POLLOUT ) reports to have 2 items in the .POLLOUT-direction already present inside the user-reported TxQueue-state, without making a single explicit .send() ( as the .poll() was launched right after a .connect() ).

    This seems to me to be some inconsistency with previous versions ( as if it would try to report a .connect()-associated "protocol/identity"-telemetry instead of reporting just the user-app-level messages ).

    Whereas I might be wrong in trying to find out some rationale, why a principally empty queue would ever try to report a message being had already inside its .POLLOUT-direction, I hope to have sufficiently proved, the problem has nothing to do with the .LINGER == 0 / .term()-ination of the Context()-instance.

    Q.E.D.