Search code examples
multithreadingmongodbcursor

MongoCursor rewind in separate threads


I have a situation in which several search procedures are parallelized into different threads. Each thread gets the same pymongo.cursor.Cusor objects to look through for its results, but each thread performs different processing. I created a demo procedure that just looks like this:

class SearchProcedure(Thread):

    weight = 0.1

    def __init__(self,weight=None):
        if weight:
            self.weight = float(weight)
        Thread.__init__(self)

    def start(self,query):
        self.query = query
        Thread.start(self)

    def run(self):
        if hasattr(self,'places'):
            for p in self.places.values():
                print p.name

        if hasattr(self,'posts'):
            for s in self.posts.values():
                s.rewind()
                print [(sh['name'],sh['description']) for sh in s]

    def attach_eligible(self,queue,**kwargs):
        self.queue = queue
        for (name,value) in kwargs.items():
            setattr(self,name,value)

The attach_eligible method is where the properties for places and posts are added to the procedure object. Again, the posts property is a set of mongo cursors that can be iterated through. I use rewind before running through the results to reset the cursor to its original state if another thread has already unwound it. The idea is the each thread will use different criteria to search the objects and then emit its results into the queue property for use in the calling/instantiating scope.

The first thread runs through fine, but all subsequent threads the utilize the cursor output the following error:

  File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 668, in __send_message
    assert response["starting_from"] == self.__retrieved
AssertionError

Rewinding, it would seem, has no effect. Is this because the cursors are passed in by reference? Would they too need to be contained within a queue to preserve lock status? Is it even possible to do something like this with mongo cursors? It would be nice if I could, since it would make the processing of the search criteria much more performant to have it executed concurrently.


Solution

  • You can't freely share any object with changing state between threads unless it's specifically designed to allow it. A pymongo cursor is not designed for this.

    What you can do is clone the cursor and then provide each thread with its own cloned copy of the cursor. See cursor.clone.