Search code examples
pythonpython-3.xasynchronousasync-awaitpython-asyncio

Chunks of async_generator


I can get chunks of an iterator doing as follow:

def get_chunks_it(l, n):
    """ Chunks an iterator `l` in size `n`
    Args:
        l (Iterator[Any]): an iterator
        n (int): size of
    Returns:
        Generator[Any]
    """
    iterator = iter(l)
    for first in iterator:
        yield itertools.chain([first], itertools.islice(iterator, n - 1))

Now let's say I have an asynchronous generator (python 3.6):

async def generator():
    for i in range(0, 10):
        yield i
        await asyncio.sleep(1)

How could I get chunks (let's say of size 3 that would yield [0, 1, 2], [3, 4, 5], [6, 7, 8], [9]) of the resulting async_generator so that I could write:

async for chunk in get_chunk_it_async(generator(), 3):
    print(chunk)

Solution

  • This is slightly complicated by the lack of an aiter() function in Python 3.6 (it'll be added in 3.7 once returning an awaitable from __aiter__ is properly deprecated). There are no async versions of itertools objects yet either.

    Define your own:

    try:
        aiter
    except NameError:
        # not yet a built-in, define our own shim for now
        from inspect import iscoroutinefunction as _isasync
        def aiter(ob, _isasync=_isasync):
            obtype = type(ob)  # magic methods are looked up on the type
            if not hasattr(obtype, '__aiter__'):
                raise TypeError(f'{obtype.__name__!r} object is not async iterable')
            async_iter = obtype.__aiter__(ob)
            if _isasync(async_iter):
                # PEP 492 allowed for __aiter__ to be a coroutine, but 525 reverses this again
                raise TypeError(f'{obtype.__name__!r} object is not async iterable')
            return async_iter
        del _isasync
    

    Next, you need to define async islice and chain objects:

    class achain():
        """Chain over multiple async iterators"""
        def __init__(self, *async_iterables):
            self._source = iter(async_iterables)
            self._active = None
        def __aiter__(self):
            return self
        async def __anext__(self):
            if self._source is None:
                # we are all done, nothing more to produce
                raise StopAsyncIteration
            if self._active is None:
                # get next async iterable to loop over
                ait = next(self._source, None)
                if ait is None:
                    # we are all done, nothing more to produce
                    self._source = None
                    raise StopAsyncIteration
                self._active = aiter(ait)
            try:
                return await type(ait).__anext__(ait)
            except StopAsyncIteration:
                # exhausted, recurse
                self._active = None
                return await self.__anext__()
    
    class aslice():
        """Slice an async iterator"""
        def __init__(self, ait, start, stop=None, step=1):
            if stop is None:
                start, stop = 0, start
            self._ait = ait
            self._next, self._stop, self._step = start, stop, step
            self._cnt = 0
        def __aiter__(self):
            return self
        async def __anext__(self):
            ait, stop = self._ait, self._stop
            if ait is None:
                raise StopAsyncIteration
            anext = type(ait).__anext__
            while self._cnt < self._next:
                try:
                    await anext(ait)
                except StopAsyncIteration:
                    self._ait = None
                    raise
                self._cnt += 1
            if stop is not None and self._cnt >= stop:
                self._ait = None
                raise StopAsyncIteration
            try:
                item = await anext(ait)
            except StopAsyncIteration:
                self._ait = None
                raise
            self._cnt += 1
            self._next += self._step
            return item
    

    With those in place, simply add async in the right places:

    async def get_chunks_it(l, n):
        """ Chunks an async iterator `l` in size `n`
        Args:
            l (Iterator[Any]): an iterator
            n (int): size of
        Returns:
            Generator[Any]
        """
        iterator = aiter(l)
        async for first in iterator:
            async def afirst():
                yield first
            yield achain(afirst, aslice(iterator, n - 1))