Search code examples
python-2.7twistedautobahnwamp-protocol

`callInThread` does not appear to trigger WAMP Subscribe until an autoPing is sent


When I trigger some code using reactor.callInThread, it seems like the reactor does not fire until some other scheduled event occurs (an auto-ping in this case).

Depending on when the auto-ping lines up with my subscribe request I can see response times between 0 and 5 seconds. Manually editing autobahn/twisted/wamp.py to change transport_factory.setProtocolOptions( ..., autoPingInterval=10., ...) to something sub-second provides a new upper bound.

I ran tcpdump and the below code (simultaneously in the same terminal - indented tcpdump output for clarity) and got output like the following. Notice that tcpdump indicates that all the messages are sent ~simultaneously, and that neither the server nor client delays responses (it is just the first client send that is quite far from the initial trigger / log message):

2017-10-30T16:26:44-0700 Auto ping/pong: sending ping auto-ping/pong
2017-10-30T16:26:44-0700 Expecting ping in 5.0 seconds for auto-ping/pong
2017-10-30T16:26:44-0700 Auto ping/pong: received pending pong for auto-ping/pong
2017-10-30T16:26:44-0700 WebSocketProtocol.onPong(payload=<4 bytes>)
    16:26:44.000880 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2772:2811, ack 8850, win 4096, options [nop,nop,TS val 826034469 ecr 12606734], length 39
    16:26:44.004235 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8850:8885, ack 2811, win 285, options [nop,nop,TS val 12613555 ecr 826034469], length 35
    16:26:44.004282 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8885, win 4094, options [nop,nop,TS val 826034472 ecr 12613555], length 0
2017-10-30T16:26:44-0700 WAMP SEND: message=Subscribe(XXX)
    <-------------------->
      Five Seconds Pass
    <-------------------->
2017-10-30T16:26:49-0700 WAMP RECV: message=Subscribed(XXX)
2017-10-30T16:26:49-0700 WAMP SEND: message=Call(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Result(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 
    16:26:49.000617 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2811:2884, ack 8885, win 4096, options [nop,nop,TS val 826039448 ecr 12613555], length 73
    16:26:49.004748 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8885:8939, ack 2884, win 285, options [nop,nop,TS val 12618555 ecr 826039448], length 54
    16:26:49.004797 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8939, win 4094, options [nop,nop,TS val 826039452 ecr 12618555], length 0
    16:26:49.006799 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2884:3537, ack 8939, win 4096, options [nop,nop,TS val 826039454 ecr 12618555], length 653
    16:26:49.009960 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8939:9000, ack 3537, win 299, options [nop,nop,TS val 12618561 ecr 826039454], length 61
    16:26:49.010004 IP CLIENT.58323 > SERVER.443: Flags [.], ack 9000, win 4094, options [nop,nop,TS val 826039457 ecr 12618561], length 0
    16:26:49.171613 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 9000:10329, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1329
    16:26:49.171616 IP SERVER.443 > CLIENT.58323: Flags [.], seq 10329:11777, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1448
    16:26:49.171618 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 11777:11857, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 80
    16:26:49.171663 IP CLIENT.58323 > SERVER.443: Flags [.], ack 10329, win 4054, options [nop,nop,TS val 826039617 ecr 12618723], length 0
    16:26:49.171678 IP CLIENT.58323 > SERVER.443: Flags [.], ack 11857, win 4006, options [nop,nop,TS val 826039617 ecr 12618723], length 0
2017-10-30T16:26:50-0700 Result: XXX (4.99s)
2017-10-30T16:26:50-0700
2017-10-30T16:26:50-0700 Sleeping

The below example reproduces the issue against my local server - heavily cut down from the original code but still with a few notable quirks: * Using SSL (needed, happy to be informed of better technique) * In a thread (~needed, using in spawned processes in a library for interaction) * Uses config.extra for state sharing (happy to be informed of better technique)

Consider the values in caps to be redacted but irrelevant.

import autobahn                                                                                                                                    
assert autobahn.__version__ == '17.6.2'                                                                                                            

class WampSession(ApplicationSession):                                                                                                             

    @inlineCallbacks                                                                                                                               
    def do_work(self, args):                                                                                                                       
        self.config.extra['joined'].wait()                                                                                                         
        yield self.subscribe(self._get_onResult(), args['resultTopic'])                                                                            
        args['authentication'] = self.config.extra['authentication']                                                                               
        yield self.call(FUNC, **args)                                                                                                              

    def _get_onResult(self):                                                                                                                       
        def onResult(**kw):
            if kw['status'] == 'done':
                self.config.extra['result_queue'].put(kw['requestID'])                                                                             
        return onResult

    def onConnect(self):
        self.join(REALM, authmethods=[u'ticket'])                                                                                                  

    def onJoin(self, details):                                                                                                                     
        self.config.extra['joined'].set()                                                                                                                                                                                                  

    def onChallenge(self, challenge):                                                                                                              
        return self.config.extra['authentication']                                                                                                 

    def onDisconnect(self):                                                                                                                        
        log.debug('onDisconnect')
        reactor.stop()


def get_session_runner():                                                                                                                          
    extra = {
        'result_queue': Queue.Queue(),                                                                                                             
        'joined': threading.Event(),                                                                                                               
        'authentication': ACCESS_TOKEN,                                                                                                            
    }     
    session = WampSession(ComponentConfig(extra=extra))                                                                                            
    certificate = PrivateCertificate.loadPEM(CERT_DATA)                                                                                            
    ssl = certificate.options()                                                                                                                    
    runner = ApplicationRunner(url=URL, ssl=ssl)                                                                                                   
    return session, runner                                                                                                                         


def main():
    session, runner = get_session_runner()
    # Monkeypatch this to make it run in a thread, since ApplicationRunner                                                                         
    # calls 'run' without arguments, and we need to skip signal handlers.                                                                          
    def _run_threaded():
        return reactor._run_original(installSignalHandlers=False)                                                                                  

    reactor._run_original = reactor.run
    reactor.run = _run_threaded
    t = threading.Thread(target=runner.run, args=(session,))                                                                                       
    t.daemon = True                                                                                                                                
    t.start()

    orig = FUNC_ARGS['resultTopic']
    for i in xrange(100):                                                                                                                          
        FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)                                                                                              
        start = time.time()                                                                                                                        
        reactor.callInThread(session.do_work, FUNC_ARGS)                                                                                           
        result = session.config.extra['result_queue'].get()                                                                                        
        end = time.time()
        print
        time.sleep(1)  # minisleep for tcpdump logging delay                                                                                       
        print 'Result: %s (%.2fs)' % (result, end - start)                                                                                         
        print '\nSleeping\n'
        time.sleep(6)
        print '\nDone Sleeping\n'

This seems to work well after modifications based upon Jean-Paul's answer (moved calls into a thread instead of reactor, use callFromThread):

def main():
    session, runner = get_session_runner()
    t = threading.Thread(target=threadstuff, args=(session,))
    t.start()
    runner.run(session)


def threadstuff(session):
    orig = FUNC_ARGS['resultTopic']
    session.config.extra['joined'].wait()
    for i in xrange(100):
        FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
        start = time.time()
        reactor.callFromThread(session.do_work, FUNC_ARGS)
        result = session.config.extra['result_queue'].get()
        end = time.time()
        print
        time.sleep(1)  # minisleep for tcpdump logging delay
        print 'Result: %s (%.2fs)' % (result, end - start)
        print '\nSleeping\n'
        time.sleep(6)
        print '\nDone Sleeping\n'
    reactor.stop()

Solution

  • callInThread is the scheduling API that lets you call a given function in a thread from the reactor's threadpool. It must be called from the reactor thread.

    callFromThread is the scheduling API that lets you call a given function in the reactor thread. It is typically called from a non-reactor thread.

    You should be using callFromThread here since you're trying to schedule work from a non-reactor thread.