Search code examples
pythonpython-asyncioasyncpg

How do I call async code from `__iter__` method?


New to async.

I had a class called PGServer that you could iterate to get a list of database names:

class PGServer:
    # ...connection code omitted...
    def _databases( self ):
        curr = self.connection().cursor()
        curr.execute( "SELECT datname FROM pg_database ORDER BY datname" )
        return [ row[ 0 ] for row in curr.fetchall() ]

    def __iter__( self ):
        return iter( self._databases() )

dbs = PGServer( ...connection params... )
for db in dbs:
    print( db )

It worked fine with psycopg. Then I switched to asyncpg:

class PGServer:
    # ...connection code omitted...
    async def _databases( self ):
        conn = await self.connection()
        rows = await conn.fetch( "SELECT datname FROM pg_database ORDER BY datname" )
        return [ row[ 0 ] for row in rows ]

dbs = PGServer( ...connection params... )
for db in await dbs._databases():
    print( db )

It works when I invoke _databases() directly, but how to I get __iter__ working again? I can't make it async because that violates the protocol. I tried implementing __aiter__ instead, but couldn't figure out how to make that work.

Some implementations that I tried:

async def __aiter__( self ):
    #return self._databases()
    #return await self._databases()
    #return aiter( self._databases() )
    return aiter( await self._databases() )

Those all generated the following error:

TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine

UPDATE

I just created an implementation that seems to work:

async def __aiter__( self ):
    for db in await self._databases():
        yield name

I don't know if that's optimal or idiomatic, though.

UPDATE 2

Unless someone can come up with something better, I'm just going to give up on having an __iter__ and saying for db in dbs:, and instead just be more explicit:

for db in await dbs.databases():
    ...

(I dropped the underscore because in this new context databases() is now the public API.)


Solution

  • TL;DR Just use __aiter__ and __anext__

    This won't solve the underlying problem (i.e., the code not being properly asynchronous) but it should make it viable to mixit with an asynchronous interface:

    The problem here is that you're trying to wrap an asynchronous-defined (but not properly async) iterator (the newly defined async _databases) with a synchronous method __iter__. As you've stated, the solution is to implement the asynchronous counterpart of that method.

    __aiter__ should return the asynchronous iterator itself, so, as your previous code simply forwards _databases as an iterator:

    ...
    class PGServer:
        def __init__(self, *args,**kwargs):
        # other __init__ stuff
            self.results = None #define variable to host the synchronous results
        async def _databases( self ):
            conn = await self.connection()
            rows = await conn.fetch( "SELECT datname FROM pg_database ORDER BY datname" )
            return [ row[ 0 ] for row in rows ]
    
        def __aiter__( self ): #return 
            return self
            
        async def __anext__(self):
            if self.results == None: #if not already set
                self.results = await self._databases()
            elif len(self.results) == 0:
                self.results = None #on finish reset to None
                raise StopAsyncIteration
            return self.results.pop(0)
    ...
    

    should work fine.


    A word on asynchronous iterators

    Further explanation

    To make an asynchronous iterator in Python you need to define both __aiter__ and __anext__.

    __aiter__ is called when you use the async for loop, and it shuould return the iterator itself

    So if PGServer is the actual iterator it should look like:

    ...
        def __aiter__( self ):
            return self
    ...
    

    But lets look a it more closely: does it look like it's really iterating asynchronously? or is it mearly being called asynchronously?

    As it is, the code you've provided only awaits the fecth method for de db interface you're using. So all results from the query are return at once as a SYNCHRONOUS iterator, the asynchronous part is only the wait for the db to resolve.

    As mentioned, the __aiter__ method only forwards the iterator itself, for the iteratior to yield asynchornously you need to define a __anext__ method. That will be called each time a new element of the iterator is requested. It should return the next element of the iterator, or raise the StopAsyncIteration if the iteration has finished.

    So, to use the _databases as an asyc iterator in this case you'd need to first define some method that constructs a properly async iterator/generator (i.e., one that returns or yield the values requested asynchronously) and then define a __anext__ somewhat like:

    ...
        async def __anext__(self):
            if ITERATOR_IS_EXHAUSTED:
                raise StopAsyncIteration
            #pop item from iterator/generator and return/yield it
    ...
    

    The solution you've arrived at works because you've made a properly async generator from the synchonous iterator (i.e., the list) that _databases returns. But te iterator itself is still synchronous. I'd recommend you check: