Search code examples
pythonautobahnpython-asyncio

How can I implement an interactive websocket client with autobahn asyncio?


I'm trying to implement a websocket/wamp client using autobahn|python and asyncio, and while it's somewhat working, there are parts that have eluded me.

What I'm really trying to do is implement WAMP in qt5/QML, but this seemed like an easier path for the moment.

This simplified client mostly copied from online does work. It reads the time service when the onJoin occurs.

What I'd like to do is trigger this read from an external source.

The convoluted approach I've taken is to run the asyncio event loop in a thread, and then to send a command over a socket to trigger the read. I have so far unable to figure out where to put the routine/coroutine so that it can be found from the reader routine.

I suspect there's a simpler way to go about this but I haven't found it yet. Suggestions are welcome.

#!/usr/bin/python3
try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import threading
import time
from socket import socketpair

rsock, wsock = socketpair()

def reader() :
    data = rsock.recv(100)
    print("Received:", data.decode())

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")



    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        ## call a remote procedure
        ##
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.add_reader(rsock,reader)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    print("IN MAIN")
    # emulate an outside call
    wsock.send(b'a byte string')

Solution

  • You can listen on a socket asynchronous inside the event loop, using loop.sock_accept. You can just call a coroutine to setup the socket inside of onConnect or onJoin:

    try:
        import asyncio
    except ImportError:
        ## Trollius >= 0.3 was renamed
        import trollius as asyncio
    
    from autobahn.asyncio import wamp, websocket
    import socket
    
    class MyFrontendComponent(wamp.ApplicationSession):
        def onConnect(self):
            self.join(u"realm1")
    
        @asyncio.coroutine
        def setup_socket(self):
            # Create a non-blocking socket
            self.sock = socket.socket()
            self.sock.setblocking(0)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.sock.bind(('localhost', 8889))
            self.sock.listen(5)
            loop = asyncio.get_event_loop()
            # Wait for connections to come in. When one arrives,
            # call the time service and disconnect immediately.
            while True:
                conn, address = yield from loop.sock_accept(self.sock)
                yield from self.call_timeservice()
                conn.close()
    
        @asyncio.coroutine
        def onJoin(self, details):
            print('joined')
            # Setup our socket server
            asyncio.async(self.setup_socket())
    
            ## call a remote procedure
            ##
            yield from self.call_timeservice()
    
        @asyncio.coroutine
        def call_timeservice(self):
            try:
               now = yield from self.call(u'com.timeservice.now')
            except Exception as e:
               print("Error: {}".format(e))
            else:
               print("Current time from time service: {}".format(now))
    
        ... # The rest is the same