Search code examples
pythonpython-3.xpython-asyncioautobahn

Callbacks from autobahn WebSocketClientProtocol to another object


Firstly, there is an IO class, which on __init__ is passed the asyncio loop object (io = IO(loop)), created earlier in the main class. IO class then at some point initializes the Socket class by doing self.socket = Socket(self), so that the socket object has a backwards access. Later, the Socket class initializes Websocket class which is a subclass of Transport

class Websocket(Transport):

    name = 'websocket'

    def __init__(self, socket):
        self.socket = socket

    def open(self):
        url = self.prepareUrl()

        factory = WebSocketClientFactory(url, debug = False)
        factory.protocol = Protocol

        websocket = self.socket.loop.create_connection(factory, host=self.socket.io.options.host, port=self.socket.options.port)

        self.socket.io.loop.run_until_complete(websocket)

    def onOpen(self):
        print('print me please!')

So, socket object calls self.transport.open() (where self.transport = Websocket(self)), which creates autobahn factory, creates asyncio connection by doing self.socket.loop.create_connection() and then adds the coro future to the loop by executing run_until_complete().

Now, this is where the problem starts: autobahn factory requires a class, which must inherit from autobahn.asyncio.websocket.WebSocketClientProtocol

My class Protocol(WebSocketClientProtocol) has the usual:

class Protocol(WebSocketClientProtocol):

    @asyncio.coroutine
    def onOpen(self):
        print('socket opened!')

This works perfectly fine, the print('socket opened!') does print the string and my server also says the connection is open.

The question: from the Protocol() class, when the onOpen() callback is called by autobahn, how can I make this method call the transport.onOpen() method and do print('print me please!')?


Solution

  • OK, so I fixed it! Easily done with PyDispatch module.

    Here is my solution:

    import asyncio
    from pydispatch import dispatcher
    from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory
    
    from ..transport import Transport
    
    class Websocket(Transport):
    
        name = 'websocket'
    
        def __init__(self, socket):
            self.socket = socket
    
        def open(self):
            url = self.prepareUrl()
    
            factory = WebSocketClientFactory(url, debug = False)
            factory.protocol = Protocol
    
            websocket = self.socket.loop.create_connection(factory, host=self.socket.io.options.host, port=self.socket.options.port)
    
            dispatcher.connect(self.onOpen, signal='open', sender=dispatcher.Anonymous)
    
            self.socket.io.loop.run_until_complete(websocket)
    
        def onOpen(self):
            print('print me please!')
    
    
    class Protocol(WebSocketClientProtocol):
    
        @asyncio.coroutine
        def onOpen(self):
            dispatcher.send(signal='open')
    

    UPDATE

    I've got another, IMO better solution to this. This one is not using PyDispatch. Since there is a callback when an asyncio task finishes, which returns the user-defined protocol object (which inherits from WebSocketClientProtocol), we can use that to link the two objects together:

    import asyncio
    from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory
    
    from ..transport import Transport
    
    class Protocol(WebSocketClientProtocol):
    
        def __init__(self):
            self.ws = None
            super().__init__()
    
        @asyncio.coroutine
        def onConnect(self, response):
            pass # connect handeled when SocketIO 'connect' packet is received
    
        @asyncio.coroutine
        def onOpen(self):
            self.ws.onOpen()
    
        @asyncio.coroutine
        def onMessage(self, payload, isBinary):
            self.ws.onMessage(payload=payload, isBinary=isBinary)
    
        @asyncio.coroutine
        def onClose(self, wasClean, code, reason):
            if not wasClean:
                self.ws.onError(code=code, reason=reason)
    
            self.ws.onClose()           
    
    class Websocket(Transport):
    
        name = 'websocket'
    
        def __init__(self, socket, **kwargs):
            super().__init__(socket)
    
            loop = kwargs.pop('loop', None)
            self.loop = loop or asyncio.get_event_loop()
    
            self.transport = None
            self.protocol = None
    
            self.ready = True
    
        def open(self):
            url = self.prepareUrl()
            if bool(self.socket.options.query):
                url = '{0}?{1}'.format(url, self.socket.options.query)
    
            factory = WebSocketClientFactory(url=url, headers=self.socket.options.headers)
            factory.protocol = Protocol
    
            coro = self.loop.create_connection(factory, host=self.socket.options.host, port=self.socket.options.port, ssl=self.socket.options.secure)
    
            task = self.loop.create_task(coro)
            task.add_done_callback(self.onWebSocketInit)
    
        def onWebSocketInit(self, future):
            try:
                self.transport, self.protocol = future.result()
                self.protocol.ws = self
            except Exception:
                self.onClose()
    
        def send(self, data):
            self.protocol.sendMessage(payload=data.encode('utf-8'), isBinary=False)
            return self
    
        def close(self):
            if self.isOpen:
                self.protocol.sendClose()
            return self
    
        def onOpen(self):
            super().onOpen()
            self.socket.setBuffer(False)
    
        def onMessage(self, payload, isBinary):
            if not isBinary:
                self.onData(payload.decode('utf-8'))
            else:
                self.onError('Message arrived in binary')
    
        def onClose(self):
            super().onClose()
            self.socket.setBuffer(True)
    
        def onError(self, code, reason):
            self.socket.onError(reason)