Search code examples
pythoncsvlockingpython-asynciopython-aiofiles

How to handle Async Appends to CSV File Without Corruption


I have a large number of asyncio tasks that are consuming data via a queue and writing to to separate files. However, some of the files will be written to multiple times via mode a+. I have written some code to simulate some random processing in a similar way to my real world example.

I am using asyncio.Lock() in the following fashion to protect the file from whatever task takes ownership of writing to it, but am still receiving CSV results that are misaligned and/or corrupted. Also, the header seems to be getting written multiple times even though the size of the file shouldn't be 0 after the header is first written.

What am I missing?

import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone

async def write_csv(item: list,  load_id: str, prefix: str) -> None:

    Path("./test_files").mkdir(parents=True, exist_ok=True)
    file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")

    # Asynchronously write to our file
    async with aiofiles.open(file_path, mode="a+", newline="") as f:

        print(f"INFO: writing file: {Path(file_path).resolve()}")
        w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
        print(f"file size: {await aiofiles.os.path.getsize(file_path)}")

        # If the file is empty, write the header
        if await aiofiles.os.path.getsize(file_path) == 0:
            print("file was empty! writing header")

            # Write the header
            async with asyncio.Lock():
                await w.writerow([
                    "response",
                    "load_id",
                    "last_updated_timestamp_utc"
                ])

        # do something special for specific file name
        # I am just trying to simulate more random data processing
        if prefix == "file_one":
            # Shuffle the chunks again
            item = random.shuffle(item)

        # Write the data
        for chunk in item:
            async with asyncio.Lock():
                await w.writerow([
                    json.dumps(chunk),
                    load_id,
                    datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
                ])

async def main() -> None:

    # Create fake data
    items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500

    # Possible file prefixes
    prefixes: list[str] = ["file_one", "file_two"]

    tasks: list = []
    load_id = str(uuid.uuid4())
    for i in items:
        # Randomly assign which file we will write to
        task = asyncio.create_task(write_csv(i, load_id, random.choice(prefixes)))
        tasks.append(task)

    errors = await asyncio.gather(*tasks, return_exceptions=True)
    # print(errors)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

Solution

  • If you want to serialize access to a resource using an asyncio.Lock instance, then all resource accessing code must try to acquire the same lock instance. But you have in your code:

            ...
            # Write the data
            for chunk in item:
                async with asyncio.Lock():
            ...
    

    Each task consequently is creating repeatedly new lock instances, which are not being shared among the other tasks.

    A second problem is you have:

            ...
            if prefix == "file_one":
                # Shuffle the chunks again
                item = random.shuffle(item)
            ...
    

    random.shuffle shuffles a sequence in place and returns None. You should have instead:

            ...
            if prefix == "file_one":
                # Shuffle the chunks again
                random.shuffle(item)
            ...
    

    I would also recommend that if you have an open file f, then the size can be most easily determined with:

    size = f.seek(0, 2)  # Seek to the end of file and return the offset
    

    In the following code, each task opens the output CSV once but prior to writing a row it makes sure that it is positioned correctly by seeking to the current end of file. The row is then written and the output csv file flushed before releasing control:

    import asyncio
    import aiofiles
    import aiofiles.os
    import aiocsv
    import uuid
    import random
    import json
    from pathlib import Path
    from datetime import datetime, timezone
    
    async def write_csv(lock: asyncio.Lock, item: list,  load_id: str, prefix: str) -> None:
    
        Path("./test_files").mkdir(parents=True, exist_ok=True)
        file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")
    
        async with aiofiles.open(file_path, mode="a+", newline="") as f:
            print(f"INFO: writing file: {Path(file_path).resolve()}")
            w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
    
            async with lock:
                # Asynchronously write to our file
                    size = await f.seek(0, 2)  # Seek to end
                    # If the file is empty, write the header
                    if size == 0:
                        print("file was empty! writing header")
    
                        # Write the header
                        await w.writerow([
                            "response",
                            "load_id",
                            "last_updated_timestamp_utc"
                        ])
                        await f.flush()
                # Release the lock implicitly
    
            # do something special for specific file name
            # I am just trying to simulate more random data processing
            if prefix == "file_one":
                # Shuffle the chunks again
                random.shuffle(item)
    
            # Write the data
            for chunk in item:
                async with lock:
                    size = await f.seek(0, 2) #  # Seek to end
                    print(f"{file_path} file size: {size}")
                    await w.writerow([
                        json.dumps(chunk),
                        load_id,
                        datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
                    ])
                    await f.flush()
    
    async def main() -> None:
    
        # Create fake data
        items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500
    
        # Possible file prefixes
        prefixes: list[str] = ["file_one", "file_two"]
    
        tasks: list = []
        load_id = str(uuid.uuid4())
        lock = asyncio.Lock()
        for i in items:
            # Randomly assign which file we will write to
            task = asyncio.create_task(write_csv(lock, i, load_id, random.choice(prefixes)))
            tasks.append(task)
    
        errors = await asyncio.gather(*tasks, return_exceptions=True)
        # print(errors)
    
    if __name__ == "__main__":
        loop = asyncio.new_event_loop()
        loop.run_until_complete(main())