Search code examples
pythonasynchronousasync-awaitpython-asynciopython-aiofiles

Async file copy - why is the file descriptor bad?


Would like to implement Python code to read and write (i.e. copy) a file. The goal is to read and write concurrently, so it reduces the time to execute the file copy. This is a learning exercise for me to get familiar with the async/await paradigm.

Here's my current implementation, but the code errors out. The second or third file read operation throws "Bad File Descriptor". The input file does exist, and I am able to read it normally using <file_object>.read()

Wondering what's going wrong here? Is it some unexpected side-effect of using async files?

import asyncio
import queue
from aiofile import async_open


async def copy_file(input_fname, output_fname):
    CHUNK_SIZE = 4096
    chunk_queue = queue.Queue()
    read_complete = False
    SLEEP_DURATION = 1

    async def read_chunks(file_object):
        nonlocal CHUNK_SIZE, chunk_queue, read_complete
        while True:
            chunk = await file_object.read(CHUNK_SIZE)
            chunk_queue.put(chunk)
            if len(chunk) < CHUNK_SIZE: # Detect last chunk
                read_complete = True
                return

    async def write_chunks(file_object):
        nonlocal CHUNK_SIZE, chunk_queue, read_complete, SLEEP_DURATION
        while True:
            status = chunk_queue.empty()
            if not status:
                chunk = chunk_queue.get()
                await file_object.write(chunk)
            else:
                await asyncio.sleep(SLEEP_DURATION)

    async with async_open(input_fname, "rb") as input_file:
        async with async_open(output_fname, "wb+") as output_file:
            asyncio.create_task(read_chunks(input_file))
            asyncio.create_task(write_chunks(output_file))
    print("Copy complete")


async def main():
    await copy_file("input.bin", "output.bin")

if __name__ == '__main__':
    asyncio.run(main())


Here is a full stack trace:

Task exception was never retrieved
future: <Task finished name='Task-4' coro=<copy_file.<locals>.write_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:21> exception=ValueError('I/O operation on closed file')>
Traceback (most recent call last):
  File "/Users/kosa/PycharmProjects/copyFile/main.py", line 27, in write_chunks
    await file_object.write(chunk)
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 217, in write
    await operation
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 243, in write_bytes
    data[written:], self.fileno(), offset + written,
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 173, in fileno
    return self.__file_obj.fileno()
ValueError: I/O operation on closed file
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<copy_file.<locals>.read_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:12> exception=SystemError('Bad file descriptor')>
Traceback (most recent call last):
  File "/Users/kosa/PycharmProjects/copyFile/main.py", line 15, in read_chunks
    chunk = await file_object.read(CHUNK_SIZE)
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 211, in read
    return await self.__read(length)
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 205, in __read
    data = await self.file.read_bytes(length, self._offset)
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 202, in read_bytes
    return await self.__context.read(size, self.fileno(), offset)
  File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/caio/asyncio_base.py", line 88, in submit
    return op.get_value()
SystemError: Bad file descriptor
Copy complete

Process finished with exit code 0

Solution

  • Aha, I found the error here. The problem is that we started tasks, but we closed the files before the tasks can complete. Instead, we need to await completion, like so:

        async with async_open(input_fname, "rb") as input_file:
            async with async_open(output_fname, "wb+") as output_file:
                reads = asyncio.create_task(read_chunks(input_file))
                writes = asyncio.create_task(write_chunks(output_file))
                await reads
                await writes
        print("Copy complete")
    

    The above code copies the file as expected 👍