Search code examples
pythonwebsockettwistedautobahn

Terminate previous autobahn websocket call when a new message arrives


I am designing a feature that gives autocomplete-like search results through a websocket. When the user types fast enough, often their previous query becomes outdated because they are already asking for new information.

Is there any way to recognize when a new query is incoming, and terminate a previous query? I tried checking if the query was processing when a new message is incoming, but it seems the new message is only processed after the previous query (the query I would like to cancel) has completed. I'm also confused how this would work when there are multiple users searching at the same time.

from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
import json
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue

running = 'no'

def main_search(query): # May take up to 400ms to process
    ...


class SearchServerProtocol(WebSocketServerProtocol):

    @inlineCallbacks
    def onMessage(self, payload, isBinary):
        if not isBinary:
            x = json.loads(payload.decode('utf8'))
            global running
            try:
                print running
                running = 'yes'
                res = yield main_search(x['query'])
                running = 'no'
                print x['query']
            except Exception as e:
                print e
                self.sendClose(1000)
            else:
                self.sendMessage(json.dumps(res).encode('utf8'))


if __name__ == '__main__':

    import sys

    from twisted.python import log
    from twisted.internet import reactor

    log.startLogging(sys.stdout)

    factory = WebSocketServerFactory("ws://104.236.31.77:8080", debug=False)
    factory.protocol = SearchServerProtocol

    reactor.listenTCP(8080, factory)
    reactor.run()

print running always returns no.

Thanks!


Solution

  • I think you need some kind of context object that represents your user session (or search session). In that context you can put a latestSearchId which you increment every search. Then you could add the searchId parameter to main_search. And supposing you have a loop or some kind of different stages in which you can abort, you could test whether the searchId in your current search is still the latest by comparing it to the search sessions' latestSearchId.

    A different approach (for example if you can't abort your searches) could be to wait a few milliseconds before you compute the search and check whether any new searches have come in, in the mean time.

    EDIT based on your comment.

    The problem you have is that you should never block the reactor loop. What you need to do is chop your main_search into pieces so that you can return control to the reactor loop.

    Would it be possible for you to do something like:

    def resume_search(position):
        #stuff
    
    reactor.callLater(0, resume_search, current_position)
    

    The reactor.callLater will schedule your function to be called as soon as it is done doing its' job. You should see the reactor loop as a big while True that basically does 3 things

    • check for incoming IO
    • execute your stuff (which is handling events)
    • send outgoing IO

    As long as your code keeps running it will never reach the other two. So when you insert a reactor.callLater, it will run the reactor cycles until the delay variable (which in our case can be 0) has passed. Note that there is no guarantee that it will call your function in time. That is because it is very possible something is running (blocking the reactor) longer than your specified interval. So you should see this reactor.callLater(0, fun) as "call me when you're not busy"