Search code examples
pythonmultithreadingwebsockettwistedautobahn

How to stop a websocket client without stopping reactor


I have an app similar to a chat-room writing in python that intends to do the following things:

  1. A prompt for user to input websocket server address.
  2. Then create a websocket client that connects to server and send/receive messages. Disable the ability to create a websocket client.
  3. After receiving "close" from server (NOT a close frame), client should drop connecting and re-enable the app to create a client. Go back to 1.
  4. If user exits the app, it exit the websocket client if there is one running.

My approach for this is using a main thread to deal with user input. When user hits enter, a thread is created for WebSocketClient using AutoBahn's twisted module and pass a Queue to it. Check if the reactor is running or not and start it if it's not. Overwrite on message method to put a closing flag into the Queue when getting "close". The main thread will be busy checking the Queue until receiving the flag and go back to start. The code looks like following.

Main thread.

def main_thread():
    while True:
        text = raw_input("Input server url or exit")
        if text == "exit":
            if myreactor:
                myreactor.stop()
            break
        msgq = Queue.Queue()
        threading.Thread(target=wsthread, args=(text, msgq)).start()

        is_close = False
        while True:
            if msgq.empty() is False:
                msg = msgq.get()
                if msg == "close":
                    is_close = True
                else:
                    print msg
                if is_close:
                    break
        print 'Websocket client closed!'

Factory and Protocol.

class MyProtocol(WebSocketClientProtocol):
    def onMessage(self, payload, isBinary):
        msg = payload.decode('utf-8')
        self.Factory.q.put(msg)
        if msg == 'close':
            self.dropConnection(abort=True)

class WebSocketClientFactoryWithQ(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        self.queue = kwargs.pop('queue', None)
        WebSocketClientFactory.__init__(self, *args, **kwargs)

Client thread.

def wsthread(url, q):
    factory = WebSocketClientFactoryWithQ(url=url, queue=q)
    factory.protocol = MyProtocol
    connectWS(Factory)
    if myreactor is None:
        myreactor = reactor
        reactor.run()
    print 'Done'

Now I got a problem. It seems that my client thread never stops. Even if I receive "close", it seems still running and every time I try to recreate a new client, it creates a new thread. I understand the first thread won't stop since reactor.run() will run forever, but from the 2nd thread and on, it should be non-blocking since I'm not starting it anymore. How can I change that?

EDIT:

I end up solving it with

  1. Adding stopFactory() after disconnect.
  2. Make protocol functions with reactor.callFromThread().
  3. Start the reactor in the first thread and put clients in other threads and use reactor.callInThread() to create them.

Solution

  • Your main_thread creates new threads running wsthread. wsthread uses Twisted APIs. The first wsthread becomes the reactor thread. All subsequent threads are different and it is undefined what happens if you use a Twisted API from them.

    You should almost certainly remove the use of threads from your application. For dealing with console input in a Twisted-based application, take a look at twisted.conch.stdio (not the best documented part of Twisted, alas, but just what you want).