Search code examples
pythonzeromqdistributed-computingpyzmq

Can't send a message on a socket bound with socket.bind()


I'm trying to make a program that receives a packet, and sends it to another port using a socket.send()-method. When I try to send the message (with a packet copy) it just won't do anything. The packet gets received fine though.

import time
import zmq


context = zmq.Context()

# Socket facing out
push = context.socket(zmq.REQ)
# push.hwm = 1
push.bind("tcp://*:5556")

# Socket facing in
sub = context.socket(zmq.SUB)
sub.bind("tcp://*:5555")

while True:

    sub.setsockopt_string(zmq.SUBSCRIBE, "")
    msg = sub.recv()
    print("test")
    push.send_string("test")
    print("test2")

When I uncomment push.hwm = 1 it doesn't help.


Solution

  • Let's demystify the code first :

    import time
    import zmq
    
    context = zmq.Context()
    
    push = context.socket( zmq.REQ ) # Socket facing out - THE MOST COMPLICATED ARCHETYPE
    # push.hwm = 1                   #        a .hwm() ? - A STRANGE PROMISE IN v17/18
    push.bind( "tcp://*:5556" )      # push acquires all ports 5556 to be REQ-served
    
    sub = context.socket( zmq.SUB )  # Socket facing in  + SUB TOPIC LIST MANAGEMENT COSTS
    sub.bind(  "tcp://*:5555" )      # sub  acquires all ports 5555 to be SUB-served
    
    while True:
    
        sub.setsockopt_string( zmq.SUBSCRIBE, "" ) # /NEVER/.SET INFINITELY MANY TIMES?
        msg = sub.recv()                           # /NEVER/ USE A BLOCKING .recv()
        pass;                      print( "test" ) #         CLI - GUI STUB print()
        push.send_string( "test" )                 #                        .send()
        pass;                      print( "test2" )#         CLI - GUI STUB print()
    

    besides a rather strange promise from using a push.hwm, that is not working in Python prior to v3.+ and sort of "skews" / "extends" the native ZeroMQ API :

    >> print pull.set_hwm.__doc__
    Set the High Water Mark.
            
            On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
    
    
            .. warning::
    
                New values only take effect for subsequent socket
                bind/connects.
    

    This code ought be refactored first. The REQ/REP is not useful at all here, having no sufficiently robust coordinated dFSA-counterpart ( ever sending any REP back to an already sent REQ, after removing a conceptual REQ-self-suicide by never ever attempting to .recv() any such REP-answer, for a case it were occasionally indeed dispatched from an unknown, if ever present, remotely interacting dFSA-node ).

    Perhaps into something like this :

    import time
    import zmq
    
    ms_TO_WAIT_IN_POLL = 100
    
    def cliShow( aText = "no parameter was fed in" ):
        print( "{0:}:: {1:}".format( time.ctime(), aText ) )
    
    context = zmq.Context()
    
    push = context.socket( zmq.PUSH )# Socket facing out - THE zmq.PUSH ARCHETYPE is a right-enough one
    push.bind( "tcp://*:5556" )      # push acquires all ports 5556 to be PUSH-served
    push.setsockopt( zmq.LINGER, 0 ) # .SET always explicitly, even if "defaults" promise
    
    pull = context.socket( zmq.PULL )# Socket facing in  - THE zmq.PULL ARCHETYPE is a right-enough one
    pull.bind( "tcp://*:5555" )      # pull acquires all ports 5555 to be PULL-served
    pull.setsockopt( zmq.LINGER, 0 ) # .SET always explicitly, even if "defaults" promise
    
    try:
        while True:
    
            if ( 0 == pull.poll( ms_TO_WAIT_IN_POLL, zmq.POLLIN ) ):
               cliShow( "No message arrived yet" )     #         CLI - GUI STUB print()
            else:
               msg  = pull.recv( zmq.NOBLOCK )         # /NEVER/ USE A BLOCKING .recv()
               cliShow( "test" )                       #         CLI - GUI STUB print()
        
               push.send_string( "test" )              #                        .send()
               cliShow( "test2" )                      #         CLI - GUI STUB print()
    
    except KeyboardInterrupt:
        pass
    
    except:
        pass
    
    finally:
    
        pull.close()
        push.close()
    
        context.term()