Search code examples
pythoniopython-asyncio

How to use asyncio properly for a generator function?


I'm reading in several thousand files at once, and for each file I need to perform operations on before yielding rows from each file. To increase performance I thought I could use asyncio to perhaps perform operations on files (and yield rows) whilst waiting for new files to be read in.

However from print statements I can see that all the files are opened and gathered, then each file is iterated over (same as would occur without asyncio).

I feel like I'm missing something quite obvious here which is making my asynchronous attempts, synchronous.

import asyncio

async def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

async def async_generator():
    file_outputs = await asyncio.gather(*[open_files(file) for file in files])

    for file_output in file_ouputs:
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

async def main():
    async for yield_value in async_generator():
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Output:

opening files
opening files
.
.
.
using open file
using open file

EDIT

Using the code supplied by @user4815162342, I noticed that, although it was 3x quicker, the set of rows yielded from the generator were slightly different than if done without concurrency. I'm unsure as of yet if this is because some yields were missed out from each file, or if the files were somehow re-ordered. So I introduced the following changes to the code from user4815162342 and entered a lock into the pool.submit()

I should have mentioned when first asking, the ordering of rows in each file and of the files themselves is required.

import concurrent.futures

def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

def generator():
    m = multiprocessing.Manager()
    lock = m.Lock()
    pool = concurrent.futures.ThreadPoolExecutor()
    file_output_futures = [pool.submit(open_files, file, lock) for file in files]
    for fut in concurrent.futures.as_completed(file_output_futures):
        file_output = fut.result()
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

def main():
    for yield_value in generator():
        pass

if __name__ == '__main__':
    main()

This way my non-concurrent and concurrent approaches yield the same values each time, however I have just lost all the speed gained from using concurrency.


Solution

  • I feel like I'm missing something quite obvious here which is making my asynchronous attempts, synchronous.

    There are two issues with your code. The first one is that asyncio.gather() by design waits for all the futures to complete in parallel, and only then returns their results. So the processing you do in the generator is not interspersed with the IO in open_files as was your intention, but only begins after all the calls to open_files have returned. To process async calls as they are done, you should be using something like asyncio.as_completed.

    The second and more fundamental issue is that, unlike threads which can parallelize synchronous code, asyncio requires everything to be async from the ground up. It's not enough to add async to a function like open_files to make it async. You need to go through the code and replace any blocking calls, such as calls to IO, with equivalent async primitives. For example, connecting to a network port should be done with open_connection, and so on. If your async function doesn't await anything, as appears to be the case with open_files, it will execute exactly like a regular function and you won't get any benefits of asyncio.

    Since you use IO on regular files, and operating systems don't expose portable async interface for regular files, you are unlikely to profit from asyncio. There are libraries like aiofiles that use threads under the hood, but they are as likely to make your code slower than to speed it up because their nice-looking async APIs involve a lot of internal thread synchronization. To speed up your code, you can use a classic thread pool, which Python exposes through the concurrent.futures module. For example (untested):

    import concurrent.futures
    
    def open_files(file):
        with open(file) as file:
            # do stuff
            print('opening files')
            return x
    
    def generator():
        pool = concurrent.futures.ThreadPoolExecutor()
        file_output_futures = [pool.submit(open_files, file) for file in files]
        for fut in file_output_futures:
            file_output = fut.result()
            print('using open file')
            for row in file_output:
                # Do stuff to row
                yield row
    
    def main():
        for yield_value in generator():
            pass
    
    if __name__ == '__main__':
        main()