Search code examples
pythonpython-3.xasynchronouspython-asynciosequence-generators

asynchronous python itertools chain multiple generators


UPDATED QUESTION FOR CLARITY:

suppose I have 2 processing generator functions:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

I can chain them with itertools

from itertools import chain

mix = chain(gen1(), gen2())

and then I can create another generator function object with it,

def mix_yield():
   for item in mix:
      yield item

or simply if I just want to next(mix), it's there.

My question is, how can I do the equivalent in asynchronous code?

Because I need it to:

  • return in yield (one by one), or with next iterator
  • the fastest resolved yield first (async)

PREV. UPDATE:

After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

but I still can't do next(a_mix)

TypeError: 'merge' object is not an iterator

or next(await a_mix)

raise StreamEmpty()

Although I still can make it into a list:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

so one goal is completed, one more to go:

  • return in yield (one by one), or with next iterator

    - the fastest resolved yield first (async)


Solution

  • Python's next built-in function is just a convenient way of invoking the underlying __next__ method on the object. The async equivalent of __next__ is the __anext__ method on the async iterator. There is no anext global function in the standard library (the aiostream library provides one), but one could easily write it:

    async def anext(aiterator):
        return await aiterator.__anext__()
    

    But the savings is so small that, in rare situations when this is needed, one may as well invoke __anext__ directly. The async iterator is in turn obtained from an async iterable by calling the __aiter__ (in analogy to __iter__ provided by regular iterables). Async iteration driven manually looks like this:

    a_iterator = obj.__aiter__()          # regular method
    elem1 = await a_iterator.__anext__()  # async method
    elem2 = await a_iterator.__anext__()  # async method
    ...
    

    __anext__ will raise StopAsyncIteration when no more elements are available. To loop over async iterators one should use async for.

    Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:

    async def main():
        a_mix = stream.combine.merge(gen1(), gen2())
        async with a_mix.stream() as streamer:
            mix_iter = streamer.__aiter__()    
            print(await mix_iter.__anext__())
            print(await mix_iter.__anext__())
            print('remaining:')
            async for x in mix_iter:
                print(x)
    
    asyncio.get_event_loop().run_until_complete(main())