Search code examples
pythonsynchronizationdeadlockzeromqlow-latency

Deadlock when synchronizing two simple python3 scripts using 0mq (ZeroMQ)


I get this strange deadlock when I try to synchronize two python3 scripts using 0mq (ZeroMQ). The scripts run fine for several thousand iterations, but sooner or later they both stop and wait for each other. I am running both scripts from different CMD-Windows on Windows 7.

I cannot figure out
why such a deadlock is even possible.
What can go wrong here?

Script A:

while (1):
   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind('tcp://127.0.0.1:10001')
   msg = socket.recv()                        # Waiting for script B to send done
   # ............................................................................
   # ... do something useful (takes only a few millisecs)
   # ............................................................................     
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect('tcp://127.0.0.1:10002')
   socket.send_string("done")                 # Tell script B we are done

Script B

while (1):
   # ............................................................................
   # ... do something useful (takes only a few millisecs)
   # ............................................................................
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect('tcp://127.0.0.1:10001')
   socket.send_string("done")               # Tell script A we are done

   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind('tcp://127.0.0.1:10002')
   msg = socket.recv()                      # Waiting for script A to send done

Solution

  • This is not a DeadLock case

    The code, sure, still needs some care.

    Disambiguation: your scenario does not hit into a resources mutual locking state, aka a DeadLock. Yes, sure, your code crashes, but most probably not due to a REQ/REP DeadLock ( where it might and does appear on a lossy network tcp: transport-class ). The posted code is crashing due to unmanaged resource handling, not due to reaching a mutual-blocking state of a DeadLock / LiveLock.


    How to fix it?

    First, let's assume your ultra-low latency-motivated system does not allow to repetitively instantiate anything. There are exceptions to this, but let's be profi.

    1. move your .Context() resource setup ( or inheritance from an outer call ) out of the loop

    2. review, whether you need and your latency constraints allow you to setup / tear-down a .socket() resource twice in each loop-run.

    3. decide, whether you can live with real REQ/REP deadlock once a first message gets lost in the transport-path

    4. enforce graceful resources-use termination ( .socket()-s, O/S port#s, .Context()-s ). Do not let them hanging unterminated forever, while creating infinite amount of others instead, that devastates any "fault-resilient" system. Resources are never infinite.

    5. design both signalling and transmission behaviours in a non-blocking manner. This allows you to detect and handle remote-process timeouts and introduce a chance for local remedy / responsive actions.

    6. redesign the code to a level of secure code you need ( the below example works a few years in a soft-realtime controlled endless loop 24/7/365 in a distributed processing framework with a remote keyboard and some other local- and remote-diagnostic tools ).


    What is missing for production-grade code?

    Your code has to "envisage" what might have gone wrong, in any part of your distributed system. Yes, it is hard, but necessary. Your remote node -- a communicating counterparty -- stopped responding, lost a message, went rebooted, stalled due to O/S crash, whatever imaginable ( plus a few rather nasty surprised you will find only on-the-fly ... ). This is another Pandora's Box to cover in this small post, which does not mean it is not necessary. It is your life-saving vest.

    Design in a non-blocking manner wherever you can, this way you remain in control of events ...

    Anyways, always release system resources and .term() all ZeroMQ .Context() instances in a graceful manner -- "tidy up" is a fair practice -- both in real life and the more in the code-empires.

    # /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
    #NONSTOP RESPONDER RAW EXAMPLE:
    def aMiniRESPONDER( aTarget2Bind2_URL             = "tcp://A.B.C.D:8889",
                        anExternalPREDICTOR           = None,
                        anExternallyManagedZmqCONTEXT = None,
                        aSpreadMinSafetyMUL           = 3.0,
                        aSilentMODE                   = True
                        ):
       try: # RESOURCES LAYER
            # ... SETUP
            # ------------------------------------------------- .Context()
            # can setup a locally-managed context or re-use
            # anExternallyManagedZmqCONTEXT obtained upon a func Call
            aZmqCONTEXT   = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )   
    
            # localhost:8887 [REP] ... remote [REQ] peer  .connect() + .send()
            aCtrlPORT_URL = "tcp://*:8887"                                      
    
            # localhost:8890 [PUB] ... remote [SUB] peers .connect() +
            # .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
            aSIGsPORT_URL = "tcp://*:8890"                                      
            aXmitPORT_URL = aTarget2Bind2_URL
    
            aListOfSOCKETs = []
    
            pass # -------------------------------------------------------------# ZMQ
            try: # -------------------------------------------------------------#
                # try: XmitPORT
                aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )
    
                # XmitPORT
                aXmitSOCKET.bind(      aXmitPORT_URL )                          
                aListOfSOCKETs.append( aXmitSOCKET )
            except:                                                             
                #    EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT
    
                msg =  "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
                print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
                raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
            pass # -------------------------------------------------------------# ZMQ
            try: # -------------------------------------------------------------#
                # try: CtrlPORT    
                # CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
                aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )                     
    
                # CtrlPORT <-REQ/REP means a remote peer [REQ] has to
                # .send()+.recv() before sending another CtrlCMD
                aCtrlSOCKET.bind(      aCtrlPORT_URL )                          
                aListOfSOCKETs.append( aCtrlSOCKET )
            except:                                                             
                # EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
                # and XmitPORT
                msg =  "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
                print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
                raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
            pass # -------------------------------------------------------------# ZMQ
            try: # -------------------------------------------------------------#
                # try: SIGsPORT
    
                # SIGsPORT [PUB] .send()s--> [SUB]s
                aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB  )                     
    
                # SIGsPORT -->  PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
                aSIGsSOCKET.bind(      aSIGsPORT_URL )                          
                aListOfSOCKETs.append( aSIGsSOCKET )
            except:                                                             
                # EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
                # and XmitPORT and SIGsPORT
                msg =  "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
                print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
                raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
            pass # -------------------------------------------------------------# ZMQ
    
            # vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            # ... SETUP YOUR APPLICATION CODE
    
            try:     # APP LAYER ___________________________________________
               #           what you want to do
               #           here you go ...
    
            except:  # APP LAYER ___________________________________________
               #           handle EXCs
    
            finally: # APP LAYER ___________________________________________
               #           your own application post-mortem / pre-exit code
    
            # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    
       except:  # RESOURCES LAYER .............................................
            # ... code shall handle it's own exceptions + externally caused events
    
       finally: # RESOURCES LAYER .............................................
            # ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )
    
            [ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
            [ allSOCKETs.close( )                    for allSOCKETs in aListOfSOCKETs ]
    
            # --------------------------------------------------------------#
            # RESOURCES dismantled, may .term()
    
            # .TERM(), NOP otherwise
            if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ):        #
                     aZmqCONTEXT.term()                                     #
            return