Search code examples
pythonmultithreadingpython-asyncio

Asyncio Future ThreadExecutor


I am trying to convert this converter (XML files from S3, to JSON) into a multithreaded application so I can speed up execution of multiple files (have 985). As a given file will be about 1gb, I would like to send say 8 of these files to be parsed at one time.

Whenever I run this I get: RuntimeWarning: coroutine 'process_object' was never awaited

Here is the code at a high level:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    return f"Processed {filename} in {time.time() - start} seconds"

if "__main__" == __name__:
    objects = get_objects(top_n=3) # list of prefixes for S3

    loop = asyncio.get_event_loop()

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [
            asyncio.wrap_future(future)
            for future in [
                loop.run_in_executor(executor, process_object, url) for url in objects
            ]
        ]
        results = loop.run_until_complete(asyncio.gather(*futures))

    loop.close()

Solution

  • I have modified and simplified your code I don't know why you are combining threadpool futures with asyncio, if you want to limit the number of workers you can use Semaphores in Asyncio

    Below is the code without using concurrent futures and simplified code that works as i can't reproduce above error exactly in my local

    Try this:

    async def process_object(filename, pid=None):
        start = time.time()
        s3 = S3Client(...)
        opensearch_client = OpenSearchClient(...)
        Parser.DEBUG = True
        parser = Parser(s3, opensearch_client)
        save_file = ...
        s3.download_from_s3(filename, save_file)
        parser.current_prefix = filename
        await parser.parse(save_file)
        print(f"Processed {filename} in {time.time() - start} seconds")
    
    
    async def process_objects_bg(objects):
        resp = await asyncio.gather(*[process_object(url) for url in objects])
        return resp
    
    
    if "__main__" == __name__:
        objects = get_objects(top_n=3)  # list of prefixes for S3
        asyncio.run(process_objects_bg(objects))