I have a large list of documents to upsert into MongoDB (possibly n > 100000). I don't want to create 100000 deferreds all at once, but I don't want to execute and wait for each query sequentially because I have a connection pool to MongoDB and I want to utilize it fully. So I have a generator function that will yield deferreds to be consumed by a DeferredLazyList
.
def generate_update_deferreds(collection, many_docs):
for doc in many_docs:
d = collection.update({'_id': doc['_id']}, doc, upsert=True)
yield d
This is the code linking the generation of the deferred upserts, and the DeferredLazyList
.
@defer.inlineCallbacks
def update_docs(collection, many_docs):
gen_deferreds = generate_update_deferreds(collection, many_docs)
results = yield DeferredLazyList(gen_deferreds, count=pool_size, consume_errors=True)
The DeferredLazyList
is similar to DeferredList, but instead of accepting a list of deferreds to wait for it accepts an iterator. The deferreds are retrieved from the iterator while only having count
deferreds active simultaneously. This is used to effectively batch deferreds because they are created as they are yielded.
class DeferredLazyList(defer.Deferred):
"""
The ``DeferredLazyList`` class is used for collecting the results of
many deferreds. This is similar to ``DeferredList``
(``twisted.internet.defer.DeferredList``) but works with an iterator
yielding deferreds. This will only maintain a certain number of
deferreds simultaneously. Once one of the deferreds finishes, another
will be obtained from the iterator.
"""
def __init__(self, deferreds, count=None, consume_errors=None):
defer.Deferred.__init__(self)
if count is None:
count = 1
self.__consume_errors = bool(consume_errors)
self.__iter = enumerate(deferreds)
self.__results = []
for _i in xrange(count):
# Start specified number of simultaneous deferreds.
if not self.called:
self.__next_save_result(None, None, None)
else:
break
def __next_save_result(self, result, success, index):
"""
Called when a deferred completes.
"""
# Make sure we can save result at index.
if index is not None:
results_len = len(self.__results)
if results_len <= index:
self.__results += [NO_RESULT] * (index - results_len + 1)
# Save result.
self.__results[index] = (success, result)
# Get next deferred.
try:
i, d = self.__iter.next()
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
except StopIteration:
# Iterator is exhausted, callback self with results.
self.callback(self.__results)
# Pass through result.
return result if success or not self.__consume_errors else None
The problem is when the deferreds are yielded from generate_update_deferreds()
their .called
is already set to True
which is causing DeferredLazyList
to recursively call itself.
What's happening is:
In DeferredLazyList.__init__()
, self.__next_save_result()
is called count
times (say 5).
Each call to self.__next_save_result()
consumes 1 deferred from self.__iter
, and itself is added as a callback.
Because the yielded deferred has .called
set to True
, d.addCallbacks(self.__next_save_result, ...)
immediately calls self.__next_save_result()
and this loop continues until a RuntimeError
is raised because recursion depth has been reached.
I've printed a stacktrace before the recursion limit was reached to confirm that this is the cause of the problem:
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/server.py", line 937, in update_many_docs
results = yield DeferredLazyList(gen_deferreds, count=self.mongo_connections, consume_errors=True, return_results=True)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 157, in __init__
self.__next_save_result(None, None, None)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
# Repeated until the RuntimeError
exceptions.RuntimeError: maximum recursion depth exceeded
Any help would be greatly appreciated. By the way, I am running Python 2.7.3 with Twisted 12.1.0 and the MongoDB stuff is really only relevant to understand the context.
I wanted the result from each deferred, but cooperate()
doesn't return those so I added a callback to each deferred before yielding them to the CooperativeTask
s:
from twisted.internet.defer import DeferredList, inlineCallbacks
from twisted.internet.task import cooperate
NO_RESULT = object()
def generate_update_deferreds(collection, many_docs, save_results):
for i, doc in enumerate(update_docs):
d = collection.update({'_id': doc['_id']}, doc, upsert=True)
d.addBoth(save_result, i, save_results) # Save result
yield d
def save_result(result, i, save_results):
save_results[i] = result
@inlineCallbacks
def update_docs(collection, many_docs):
save_results = [NO_RESULT] * len(many_docs)
gen_deferreds = generate_update_deferreds(collection, many_docs, save_results))
workers = [cooperate(gen_deferreds).whenDone() for _i in xrange(count)]
yield defer.DeferredList(workers)
# Handle save_results...
There are some tools in Twisted that will help you do this more easily. For example, cooperate:
from twisted.internet.task import cooperate
def generate_update_deferreds(collection, many_docs):
for doc in update_docs:
d = collection.update({'_id': doc['_id']}, doc, upsert=True)
yield d
work = generate_update_deferreds(...)
worker_tasks = []
for i in range(count):
task = cooperate(work)
worker_tasks.append(task)
all_done_deferred = DeferredList([task.whenDone() for task in worker_tasks])