What specifically needs to change in the Python 3.12 code below in order for each and every one of the calls to the write_to_file(linesBuffer)
function to run in parallel instead of running sequentially?
In other words,
write_to_file(linesBuffer)
to return,write_to_file(linesBuffer)
does eventually return.Each call to write_to_file(linesBuffer)
should start at a different time, and return after whatever different duration might be required in order for each call to successfully complete its work. And there should never be delays waiting for one call to write_to_file(linesBuffer)
to complete before the next call to write_to_file(linesBuffer)
is initiated.
When we remove await
from the write_to_file(linesBuffer)
line, the result is that none of the print commands inside the write_to_file(linesBuffer)
function ever get executed. So we cannot simply change await write_to_file(linesBuffer)
to write_to_file(linesBuffer)
.
The problem in the code is that the many sequential calls to the await write_to_file(linesBuffer)
function cause the program to become very slow.
Here is the code:
import os
import platform
import asyncio
numLines = 10
def get_source_file_path():
if platform.system() == 'Windows':
return 'C:\\path\\to\\sourceFile.txt'
else:
return '/path/to/sourceFile.txt'
async def write_to_file(linesBuffer):
print("inside Writing to file...")
with open('newFile.txt', 'a') as new_destination_file:
for line in linesBuffer:
new_destination_file.write(line)
#get the name of the directory in which newFile.txt is located. Then print the name of the directory.
directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
print("directory_name: ", directory_name)
linesBuffer.clear()
#print every 1 second for 2 seconds.
for i in range(2):
print("HI HO, HI HO. IT'S OFF TO WORK WE GO...")
await asyncio.sleep(1)
print("inside done Writing to file...")
async def read_source_file():
source_file_path = get_source_file_path()
linesBuffer = []
counter = 0
print("Reading source file...")
print("source_file_path: ", source_file_path)
#Detect the size of the file located at source_file_path and store it in the variable file_size.
file_size = os.path.getsize(source_file_path)
print("file_size: ", file_size)
with open(source_file_path, 'r') as source_file:
source_file.seek(0, os.SEEK_END)
while True:
line = source_file.readline()
new_file_size = os.path.getsize(source_file_path)
if new_file_size < file_size:
print("The file has been truncated.")
source_file.seek(0, os.SEEK_SET)
file_size = new_file_size
linesBuffer.clear()
counter = 0
print("new_file_size: ", new_file_size)
if len(line) > 0:
new_line = str(counter) + " line: " + line
print(new_line)
linesBuffer.append(new_line)
print("len(linesBuffer): ", len(linesBuffer))
if len(linesBuffer) >= numLines:
print("Writing to file...")
await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.
print("awaiting Writing to file...")
linesBuffer.clear()
counter += 1
print("counter: ", counter)
if not line:
await asyncio.sleep(0.1)
continue
#detect whether or not the present line is the last line in the file. If it is the last line in the file, then write the line to the file.
if source_file.tell() == file_size:
print("LAST LINE IN FILE FOUND. Writing to file...")
await write_to_file(linesBuffer)
print("awaiting Writing to file...")
linesBuffer.clear()
counter = 0
async def main():
await read_source_file()
if __name__ == '__main__':
asyncio.run(main())
A couple of points:
First, as been commented upon, the file I/O you are doing is not asynchronous and asyncio
does not support asynchronous file I/O. For this I would suggest you install from the PyPI repository the aiofiles
module.
Second, you have the following:
await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.
Actually, without the await
the function write_to_file
never gets called. The expression write_to_file(linesBuffer)
only results in returning a coroutine that must be awaited if you want to call it, as you are currently doing. But this call is actually synchronous in that the caller is suspended, the coroutine is called and once it completes and returns a value (even if it is the implicit None
if there is no return
statement) the caller then resumes with await write_to_file(linesBuffer)
evaluating to that return value.
But you want write_to_file
to run asynchronously (concurrently) with your read_source_file
coroutine. For that, you need to create a separate task. See asyncio.create_task
for details. Pay particular attention about saving the task instance returned by this call to prevent the task from prematurely terminating due to it being garbage collected.
So basically your modified code would be as follows (I have not verified that its overall logic is correct):
import os
import platform
import asyncio
import aiofiles
numLines = 10
def get_source_file_path():
if platform.system() == 'Windows':
return 'C:\\path\\to\\sourceFile.txt'
else:
return '/path/to/sourceFile.txt'
async def write_to_file(linesBuffer):
print("inside Writing to file...")
async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
for line in linesBuffer:
await new_destination_file.write(line)
#get the name of the directory in which newFile.txt is located. Then print the name of the directory.
directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
print("directory_name: ", directory_name)
linesBuffer.clear()
#print every 1 second for 2 seconds.
for i in range(2):
print("HI HO, HI HO. IT'S OFF TO WORK WE GO...")
await asyncio.sleep(1)
print("inside done Writing to file...")
async def read_source_file():
source_file_path = get_source_file_path()
linesBuffer = []
counter = 0
print("Reading source file...")
print("source_file_path: ", source_file_path)
#Detect the size of the file located at source_file_path and store it in the variable file_size.
file_size = os.path.getsize(source_file_path)
print("file_size: ", file_size)
background_tasks = set()
async with aiofiles.open(source_file_path, 'r') as source_file:
await source_file.seek(0, os.SEEK_END)
while True:
line = await source_file.readline()
new_file_size = os.path.getsize(source_file_path)
if new_file_size < file_size:
print("The file has been truncated.")
await source_file.seek(0, os.SEEK_SET)
file_size = new_file_size
linesBuffer.clear()
counter = 0
print("new_file_size: ", new_file_size)
if len(line) > 0:
new_line = str(counter) + " line: " + line
print(new_line)
linesBuffer.append(new_line)
print("len(linesBuffer): ", len(linesBuffer))
if len(linesBuffer) >= numLines:
print("Writing to file...")
task = asyncio.create_task(write_to_file(linesBuffer))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
linesBuffer.clear()
counter += 1
print("counter: ", counter)
if not line:
await asyncio.sleep(0.1)
continue
#detect whether or not the present line is the last line in the file. If it is the last line in the file, then write the line to the file.
if await source_file.tell() == file_size:
print("LAST LINE IN FILE FOUND. Writing to file...")
task = asyncio.create_task(write_to_file(linesBuffer))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
linesBuffer.clear()
counter = 0
async def main():
await read_source_file()
if __name__ == '__main__':
asyncio.run(main())
Update
Maybe you want something like this. This code essentially look to see if lines have been appended to an input file and if so, they are accumulated in a batch and sent to another task for adding to an output file.
If you see that the input file has been truncated, you execute:
await source_file.seek(0, os.SEEK_SET)
That positions you to the beginning of the file. Is that what you really want? I don't get it. Since you clearly know what it is you want, you will be in a better position to make adjustments to this code. If it's not even in the ballpark, then I surrender.
import os
import platform
import asyncio
import aiofiles
BATCH_SIZE = 10
def get_source_file_path():
if platform.system() == 'Windows':
return 'C:\\path\\to\\sourceFile.txt'
else:
return '/path/to/sourceFile.txt'
async def write_to_file(queue):
async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
while True:
lines = queue.get()
print("Writing to file...")
for line in lines:
await new_destination_file.write(line)
print("Done Writing to file...")
async def read_source_file():
source_file_path = get_source_file_path()
counter = 0
print("Reading source file...")
print("source_file_path: ", source_file_path)
#Detect the size of the file located at source_file_path and store it in the variable file_size.
file_size = os.path.getsize(source_file_path)
print("file_size: ", file_size)
queue = asyncio.Queue()
write_task = asyncio.create_task(write_to_file)
async with aiofiles.open(source_file_path, 'r') as source_file:
await source_file.seek(0, os.SEEK_END)
linesBuffer = []
while True:
# Always make sure that file_size is the current size:
old_file_size = file_size
file_size = os.path.getsize(source_file_path)
if file_size < old_file_size:
print("The file has been truncated.")
await source_file.seek(0, os.SEEK_SET)
# Allocate a new list instead of clearing the current one
linesBuffer = []
counter = 0
print("new_file_size: ", new_file_size)
continue
line = await source_file.readline()
if line:
new_line = str(counter) + " line: " + line
print(new_line)
linesBuffer.append(new_line)
print("len(linesBuffer): ", len(linesBuffer))
if len(linesBuffer) == BATCH_SIZE:
print("Writing batch to file...")
await queue.put(linesBuffer)
linesBuffer = []
counter += 1
print("counter: ", counter)
#detect whether or not the present line is the last line in the file.
# If it is the last line in the file, then write whatever batch
# we have even if it is not complete.
if await source_file.tell() == file_size:
print("LAST LINE IN FILE FOUND.")
if linesBuffer:
# Write even though it's not a full batch:
await queue.put(linesBuffer)
linesBuffer = []
counter = 0
else:
await asyncio.sleep(0.1)
async def main():
await read_source_file()
if __name__ == '__main__':
asyncio.run(main())
If you are actually sending your batches to an api and you want these to run concurrently, then:
import os
import platform
import asyncio
import aiofiles
BATCH_SIZE = 10
def get_source_file_path():
if platform.system() == 'Windows':
return 'C:\\path\\to\\sourceFile.txt'
else:
return '/path/to/sourceFile.txt'
async def send_to_api(linesBuffer):
...
async def read_source_file():
source_file_path = get_source_file_path()
counter = 0
print("Reading source file...")
print("source_file_path: ", source_file_path)
#Detect the size of the file located at source_file_path and store it in the variable file_size.
file_size = os.path.getsize(source_file_path)
print("file_size: ", file_size)
background_tasks = set()
async with aiofiles.open(source_file_path, 'r') as source_file:
await source_file.seek(0, os.SEEK_END)
linesBuffer = []
while True:
# Always make sure that file_size is the current size:
old_file_size = file_size
file_size = os.path.getsize(source_file_path)
if file_size < old_file_size:
print("The file has been truncated.")
await source_file.seek(0, os.SEEK_SET)
# Allocate a new list instead of clearing the current one
linesBuffer = []
counter = 0
print("new_file_size: ", new_file_size)
continue
line = await source_file.readline()
if line:
new_line = str(counter) + " line: " + line
print(new_line)
linesBuffer.append(new_line)
print("len(linesBuffer): ", len(linesBuffer))
if len(linesBuffer) == BATCH_SIZE:
print("sending to api...")
task = asyncio.create_task(send_to_api(linesBuffer))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# Do not clear the buffer; allocate a new one:
linesBuffer = []
counter += 1
print("counter: ", counter)
#detect whether or not the present line is the last line in the file.
# If it is the last line in the file, then write whatever batch
# we have even if it is not complete.
if await source_file.tell() == file_size:
print("LAST LINE IN FILE FOUND.")
if linesBuffer:
# Send even though it's not a full batch:
task = asyncio.create_task(send_to_api(linesBuffer))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
linesBuffer = []
counter = 0
else:
await asyncio.sleep(0.1)
async def main():
await read_source_file()
if __name__ == '__main__':
asyncio.run(main())