Search code examples
pythontwistedtwisted.internet

Lazy Deferred List reaching maximum recursion depth


I have a large list of documents to upsert into MongoDB (possibly n > 100000). I don't want to create 100000 deferreds all at once, but I don't want to execute and wait for each query sequentially because I have a connection pool to MongoDB and I want to utilize it fully. So I have a generator function that will yield deferreds to be consumed by a DeferredLazyList.

def generate_update_deferreds(collection, many_docs):
    for doc in many_docs:
        d = collection.update({'_id': doc['_id']}, doc, upsert=True)
        yield d

This is the code linking the generation of the deferred upserts, and the DeferredLazyList.

@defer.inlineCallbacks
def update_docs(collection, many_docs):
    gen_deferreds = generate_update_deferreds(collection, many_docs)
    results = yield DeferredLazyList(gen_deferreds, count=pool_size, consume_errors=True)

The DeferredLazyList is similar to DeferredList, but instead of accepting a list of deferreds to wait for it accepts an iterator. The deferreds are retrieved from the iterator while only having count deferreds active simultaneously. This is used to effectively batch deferreds because they are created as they are yielded.

class DeferredLazyList(defer.Deferred):
    """
    The ``DeferredLazyList`` class is used for collecting the results of
    many deferreds. This is similar to ``DeferredList``
    (``twisted.internet.defer.DeferredList``) but works with an iterator
    yielding deferreds. This will only maintain a certain number of
    deferreds simultaneously. Once one of the deferreds finishes, another
    will be obtained from the iterator.
    """

    def __init__(self, deferreds, count=None, consume_errors=None):
        defer.Deferred.__init__(self)

        if count is None:
            count = 1

        self.__consume_errors = bool(consume_errors)

        self.__iter = enumerate(deferreds)
        self.__results = []
        for _i in xrange(count):
            # Start specified number of simultaneous deferreds.
            if not self.called:
                self.__next_save_result(None, None, None)
            else:
                break

    def __next_save_result(self, result, success, index):
        """
        Called when a deferred completes.
        """
        # Make sure we can save result at index.
        if index is not None:
            results_len = len(self.__results)
            if results_len <= index:
                self.__results += [NO_RESULT] * (index - results_len + 1)
            # Save result.
            self.__results[index] = (success, result)

        # Get next deferred.
        try:
            i, d = self.__iter.next()
            d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))

        except StopIteration:
            # Iterator is exhausted, callback self with results.
            self.callback(self.__results)

        # Pass through result.
        return result if success or not self.__consume_errors else None

The problem is when the deferreds are yielded from generate_update_deferreds() their .called is already set to True which is causing DeferredLazyList to recursively call itself.

What's happening is:

  1. In DeferredLazyList.__init__(), self.__next_save_result() is called count times (say 5).

  2. Each call to self.__next_save_result() consumes 1 deferred from self.__iter, and itself is added as a callback.

  3. Because the yielded deferred has .called set to True, d.addCallbacks(self.__next_save_result, ...) immediately calls self.__next_save_result() and this loop continues until a RuntimeError is raised because recursion depth has been reached.

I've printed a stacktrace before the recursion limit was reached to confirm that this is the cause of the problem:

File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/server.py", line 937, in update_many_docs
    results = yield DeferredLazyList(gen_deferreds, count=self.mongo_connections, consume_errors=True, return_results=True)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 157, in __init__
    self.__next_save_result(None, None, None)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
    self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
    self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
# Repeated until the RuntimeError
exceptions.RuntimeError: maximum recursion depth exceeded

Any help would be greatly appreciated. By the way, I am running Python 2.7.3 with Twisted 12.1.0 and the MongoDB stuff is really only relevant to understand the context.


I wanted the result from each deferred, but cooperate() doesn't return those so I added a callback to each deferred before yielding them to the CooperativeTasks:

from twisted.internet.defer import DeferredList, inlineCallbacks
from twisted.internet.task import cooperate

NO_RESULT = object()

def generate_update_deferreds(collection, many_docs, save_results):
    for i, doc in enumerate(update_docs):
        d = collection.update({'_id': doc['_id']}, doc, upsert=True)
        d.addBoth(save_result, i, save_results) # Save result
        yield d

def save_result(result, i, save_results):
    save_results[i] = result

@inlineCallbacks
def update_docs(collection, many_docs):
    save_results = [NO_RESULT] * len(many_docs)
    gen_deferreds = generate_update_deferreds(collection, many_docs, save_results))
    workers = [cooperate(gen_deferreds).whenDone() for _i in xrange(count)]
    yield defer.DeferredList(workers)
    # Handle save_results...

Solution

  • There are some tools in Twisted that will help you do this more easily. For example, cooperate:

    from twisted.internet.task import cooperate
    
    def generate_update_deferreds(collection, many_docs):
        for doc in update_docs:
            d = collection.update({'_id': doc['_id']}, doc, upsert=True)
            yield d
    
    work = generate_update_deferreds(...)
    worker_tasks = []
    for i in range(count):
        task = cooperate(work)
        worker_tasks.append(task)
    
    all_done_deferred = DeferredList([task.whenDone() for task in worker_tasks])