I have a situation in which several search procedures are parallelized into different threads. Each thread gets the same pymongo.cursor.Cusor
objects to look through for its results, but each thread performs different processing. I created a demo procedure that just looks like this:
class SearchProcedure(Thread):
weight = 0.1
def __init__(self,weight=None):
if weight:
self.weight = float(weight)
Thread.__init__(self)
def start(self,query):
self.query = query
Thread.start(self)
def run(self):
if hasattr(self,'places'):
for p in self.places.values():
print p.name
if hasattr(self,'posts'):
for s in self.posts.values():
s.rewind()
print [(sh['name'],sh['description']) for sh in s]
def attach_eligible(self,queue,**kwargs):
self.queue = queue
for (name,value) in kwargs.items():
setattr(self,name,value)
The attach_eligible
method is where the properties for places and posts are added to the procedure object. Again, the posts
property is a set of mongo cursors that can be iterated through. I use rewind
before running through the results to reset the cursor to its original state if another thread has already unwound it. The idea is the each thread will use different criteria to search the objects and then emit its results into the queue
property for use in the calling/instantiating scope.
The first thread runs through fine, but all subsequent threads the utilize the cursor output the following error:
File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 668, in __send_message
assert response["starting_from"] == self.__retrieved
AssertionError
Rewinding, it would seem, has no effect. Is this because the cursors are passed in by reference? Would they too need to be contained within a queue to preserve lock status? Is it even possible to do something like this with mongo cursors? It would be nice if I could, since it would make the processing of the search criteria much more performant to have it executed concurrently.
You can't freely share any object with changing state between threads unless it's specifically designed to allow it. A pymongo cursor
is not designed for this.
What you can do is clone the cursor and then provide each thread with its own cloned copy of the cursor. See cursor.clone
.