Would like to implement Python code to read and write (i.e. copy) a file. The goal is to read and write concurrently, so it reduces the time to execute the file copy. This is a learning exercise for me to get familiar with the async/await paradigm.
Here's my current implementation, but the code errors out. The second or third file read operation throws "Bad File Descriptor". The input file does exist, and I am able to read it normally using <file_object>.read()
Wondering what's going wrong here? Is it some unexpected side-effect of using async files?
import asyncio
import queue
from aiofile import async_open
async def copy_file(input_fname, output_fname):
CHUNK_SIZE = 4096
chunk_queue = queue.Queue()
read_complete = False
SLEEP_DURATION = 1
async def read_chunks(file_object):
nonlocal CHUNK_SIZE, chunk_queue, read_complete
while True:
chunk = await file_object.read(CHUNK_SIZE)
chunk_queue.put(chunk)
if len(chunk) < CHUNK_SIZE: # Detect last chunk
read_complete = True
return
async def write_chunks(file_object):
nonlocal CHUNK_SIZE, chunk_queue, read_complete, SLEEP_DURATION
while True:
status = chunk_queue.empty()
if not status:
chunk = chunk_queue.get()
await file_object.write(chunk)
else:
await asyncio.sleep(SLEEP_DURATION)
async with async_open(input_fname, "rb") as input_file:
async with async_open(output_fname, "wb+") as output_file:
asyncio.create_task(read_chunks(input_file))
asyncio.create_task(write_chunks(output_file))
print("Copy complete")
async def main():
await copy_file("input.bin", "output.bin")
if __name__ == '__main__':
asyncio.run(main())
Here is a full stack trace:
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<copy_file.<locals>.write_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:21> exception=ValueError('I/O operation on closed file')>
Traceback (most recent call last):
File "/Users/kosa/PycharmProjects/copyFile/main.py", line 27, in write_chunks
await file_object.write(chunk)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 217, in write
await operation
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 243, in write_bytes
data[written:], self.fileno(), offset + written,
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 173, in fileno
return self.__file_obj.fileno()
ValueError: I/O operation on closed file
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<copy_file.<locals>.read_chunks() done, defined at /Users/kosa/PycharmProjects/copyFile/main.py:12> exception=SystemError('Bad file descriptor')>
Traceback (most recent call last):
File "/Users/kosa/PycharmProjects/copyFile/main.py", line 15, in read_chunks
chunk = await file_object.read(CHUNK_SIZE)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 211, in read
return await self.__read(length)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/utils.py", line 205, in __read
data = await self.file.read_bytes(length, self._offset)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/aiofile/aio.py", line 202, in read_bytes
return await self.__context.read(size, self.fileno(), offset)
File "/Users/kosa/PycharmProjects/copyFile/venv/lib/python3.8/site-packages/caio/asyncio_base.py", line 88, in submit
return op.get_value()
SystemError: Bad file descriptor
Copy complete
Process finished with exit code 0
Aha, I found the error here. The problem is that we started tasks, but we closed the files before the tasks can complete. Instead, we need to await completion, like so:
async with async_open(input_fname, "rb") as input_file:
async with async_open(output_fname, "wb+") as output_file:
reads = asyncio.create_task(read_chunks(input_file))
writes = asyncio.create_task(write_chunks(output_file))
await reads
await writes
print("Copy complete")
The above code copies the file as expected 👍