Search code examples
mongodbpython-asynciotornado-motor

Empty iterator when using TaskGroup with Motor Aggregate


I am receiving StopIteration error when iterating over the result of an aggregate using TaskGroups, but not otherwise. I have created a minimal example to show the error:

import asyncio

from motor.motor_asyncio import AsyncIOMotorClient


def create_aggregate():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    collection = client["test"]["test"]

    return collection.aggregate([])

async def get_next(agg):
    item = await agg.next()
    print(item)

async def run_1_success():
    agg = create_aggregate()
    
    await get_next(agg)

async def run_2_success():
    agg = create_aggregate()
    await get_next(agg)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(get_next(agg))
        tg.create_task(get_next(agg))

async def run_3_error():
    agg = create_aggregate()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(get_next(agg))
        tg.create_task(get_next(agg))

When running asyncio.run(run_1_success()) or asyncio.run(run_2_success()) I can successfully iterate over the trivial aggregation, even using TaskGroup. But when running asyncio.run(run_3_error()) I receive a StopAsyncIteration error.

Environment:

  • Python 3.12.2 (tags/v3.12.2:6abddd9, Feb 6 2024, 21:26:36) [MSC v.1937 64 bit (AMD64)]
  • Motor 3.4.0
  • PyMongo 4.6.2
  • Windows 10 64 bit.

Thank you.


Solution

  • From the docs, for MotorCollection.aggregate, emphasis mine:

    Note that this method returns a MotorCommandCursor which lazily runs the aggregate command when first iterated. In order to run an aggregation with $out or $merge the application needs to iterate the cursor.

    In run_2_success, there is an initial await get_next(agg), which executes the aggregation and waits until the cursor is returned. So the cursor is available to be iterated over in the TaskGroups with block.

    But in run_3_error, since both tasks have the same agg aggregation object, it is not ready for next - while the first one is still waiting for the cursor to be returned and the second one is already trying to iterate over the cursor (or vice-versa as the tasks execute).

    If I add a await asyncio.sleep(1) between the two tasks in run_3_error, then it works.

    Also, you don't need a get_next(agg) method. The MotorCollection.aggregate object

    Returns a MotorCommandCursor that can be iterated like a cursor from find():

    Side note: You should not be trying to iterate over the results of one cursor in parallel. If you want to do something asynchronously with the results, it should be something like:

    async def do_something(res):
        print(f"doing something with: {res["item"]}")
        await asyncio.sleep(.1)
        print(f"done with: {res["item"]}")
    
    
    async def run_4_okay():
        agg = create_aggregate()
        
        async with asyncio.TaskGroup() as tg:
            async for result in agg:
                tg.create_task(do_something(result))
    

    (Append those to a tasks list to track results later)

    Output, using this MongoDB docs collection :

    doing something with: notebook
    doing something with: paper
    doing something with: postcard
    done with: paper
    done with: postcard
    done with: notebook