Search code examples
pythontwisted

Waiting on events in other requests in Twisted


I have a simple Twisted server that handles requests like this (obviously, asynchronously)

global SomeSharedMemory
if SomeSharedMemory is None:
    SomeSharedMemory = LoadSharedMemory()
return PickSomething(SomeSharedMemory)

Where SomeSharedMemory is loaded from a database.

I want to avoid loading SomeSharedMemory from the database multiple times. Specifically, when the server first starts, and we get two concurrent incoming requests, we might see something like this:

Request 1: Check for SomeSharedMemory, don't find it Request 1: Issue database query to load SSM Request 2: Check for SSM, don't find it Request 2: Issue database query to load SSM Request 1: Query returns, store SSM Request 1: Return result Request 2: Query returns, store SSM Request 2: Return result

With more concurrent requests, the database gets hammered. I'd like to do something like this (see http://docs.python.org/library/threading.html#event-objects):

global SomeSharedMemory, SSMEvent
if SomeSharedMemory is None:
    if not SSMEvent.isSet():
        SSMEvent.wait()
    else:
        # assumes that the event is initialized "set"
        SSMEvent.clear()
        SomeSharedMemory = LoadSharedMemory()
        SSMEvent.set()
return PickSomething(SomeSharedMemory)

Such that if one request is loading the shared memory, other requests will wait politely until the query is complete rather than issue their own duplicate database queries.

Is this possible in Twisted?


Solution

  • The way your example is set up, it's hard to see how you could actually have the problem you're describing. If a second request comes in to your Twisted server before the call to LoadSharedMemory issued by the first has returned, then the second request will just wait before being processed. When it is finally handled, SomeSharedMemory will be initialized and there will be no duplication.

    However, I suppose maybe it is the case that LoadSharedMemory is asynchronous and returns a Deferred, so that your code really looks more like this:

    def handleRequest(request):
        if SomeSharedMemory is None:
            d = initSharedMemory()
            d.addCallback(lambda ignored: handleRequest(request))
        else:
            d = PickSomething(SomeSharedMemory)
        return d
    

    In this case, it's entirely possible that a second request might arrive while initSharedMemory is off doing its thing. Then you would indeed end up with two tasks trying to initialize that state.

    The thing to do, of course, is notice this third state that you have. There is not only un-initialized and initializ-ed, but also initializ-ing. So represent that state as well. I'll hide it inside the initSharedMemory function to keep the request handler as simpler as it already is:

    initInProgress = None
    
    def initSharedMemory():
        global initInProgress
        if initInProgress is None:
            initInProgress = _reallyInit()
            def initialized(result):
                global initInProgress, SomeSharedMemory
                initInProgress = None
                SomeSharedMemory = result
            initInProgress.addCallback(initialized)
        d = Deferred()
        initInProgress.chainDeferred(d)
        return d
    

    This is a little gross because of the globals everywhere. Here's a slightly cleaner version:

    from twisted.internet.defer import Deferred, succeed
    
    class SharedResource(object):
        def __init__(self, initializer):
            self._initializer = initializer
            self._value = None
            self._state = "UNINITIALIZED"
            self._waiting = []
    
    
        def get(self):
            if self._state == "INITIALIZED":
                # Return the already computed value
                return succeed(self._value)
    
            # Create a Deferred for the caller to wait on
            d = Deferred()
            self._waiting.append(d)
    
            if self._state == "UNINITIALIZED":
                # Once, run the setup
                self._initializer().addCallback(self._initialized)
                self._state = "INITIALIZING"
    
            # Initialized or initializing state here
            return d
    
    
         def _initialized(self, value):
             # Save the value, transition to the new state, and tell
             # all the previous callers of get what the result is.
             self._value = value
             self._state = "INITIALIZED"
             waiting, self._waiting = self._waiting, None
             for d in waiting:
                 d.callback(value)
    
    
    SomeSharedMemory = SharedResource(initializeSharedMemory)
    
    def handleRequest(request):
        return SomeSharedMemory.get().addCallback(PickSomething)
    

    Three states, nice explicit transitions between them, no global state to update (at least if you give SomeSharedMemory some non-global scope), and handleRequest doesn't know about any of this, it just asks for a value and then uses it.