Search code examples
pythonmultithreadingparallel-processingfile-handlingmemory-efficient

How to process a massive file in parallel in Python while maintaining order and optimizing memory usage?


I'm working on a Python project where I need to process a very large file (e.g., a multi-gigabyte CSV or log file) in parallel to speed up processing. However, I have three specific requirements that make this task challenging:

Order Preservation: The output must strictly maintain the same line order as the input file. Memory Efficiency: The solution must avoid loading the entire file into memory (e.g., by reading it line-by-line or in chunks). Concurrency: The processing should leverage parallelism to handle CPU-intensive tasks efficiently. My Current Approach I used concurrent.futures.ThreadPoolExecutor to parallelize the processing, but I encountered the following issues:

While executor.map produces results in the correct order, it seems inefficient because tasks must wait for earlier ones to complete even if later tasks finish earlier. Reading the entire file using file.readlines() consumes too much memory, especially for multi-gigabyte files. Here’s an example of what I tried:

import concurrent.futures

def process_line(line):
    # Simulate a CPU-bound operation
    return line.upper()

with open("large_file.txt", "r") as infile:
    lines = infile.readlines()

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(process_line, lines))

with open("output.txt", "w") as outfile:
    outfile.writelines(results)

While this code works for small files, it fails for larger ones due to memory constraints and potential inefficiencies in thread usage.

Desired Solution I’m looking for a solution that:

  • Processes lines in parallel to leverage multiple CPU cores or threads.
  • Ensures that output lines are written in the same order as the input file.
  • Reads and processes the file in a memory-efficient way (e.g., streaming or chunk-based processing).

Additionally, I would like to understand:

  • Whether ThreadPoolExecutor or ProcessPoolExecutor is more appropriate for this scenario, considering the potential CPU-bound nature of the tasks.

  • Best practices for buffering and writing results to an output file without consuming too much memory.

Key Challenges*

  • How can I assign unique identifiers to each line (or chunk) to maintain order without introducing significant overhead?

  • Are there existing libraries or design patterns in Python that simplify this kind of parallel processing for large files?

Any insights, examples, or best practices to tackle this problem would be greatly appreciated!


Solution

  • No one can really answer whether using ThreadPoolExecutor or ProcessPoolExecutor will be faster without knowing exactly what each task does. you need to try both and Benchmark the time taken by each to find which is better.

    this code can help you figure that out yourself, it is based on this answer, but it uses a queue to limit the lines being read, so you don't risk having the entire file in memory if the processing is slow. also writing to the output file is done by its own thread, reading and writing to files (IO) releases the GIL, so they can both happen in parallel.

    import concurrent.futures
    import os
    import queue
    import threading
    from io import IOBase
    import time
    from typing import Optional
    
    def process_line(line: str):
        # Simulate some CPU-bound work on the line
        for i in range(int(1e6)):
            pass
        return line.upper()
    
    def writer_task(out_file: IOBase, writer_queue: queue.Queue):
        while True:
            fut: Optional[concurrent.futures.Future] = writer_queue.get()
            if fut is None:
                break
            line = fut.result()
            out_file.write(line)
            print("line written")
    
    # Wrap main script behavior in main function
    def main():
        t1 = time.time()
        with open("large_file.txt") as infile, open("output.txt", "w") as outfile:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                writer_queue = queue.Queue(maxsize=os.cpu_count() * 2 + 10)
                writer = threading.Thread(target=writer_task, args=(outfile, writer_queue), daemon=True)
                writer.start()
                for line in infile:
                    print("line read")
                    writer_queue.put(executor.submit(process_line, line))
                writer_queue.put(None)  # signal file end
                writer.join()
        t2 = time.time()
        print(f"time taken = {t2-t1}")
    
    # Invoke main function only when run as script, not when imported or invoked
    # as part of spawn-based multiprocessing
    if __name__ == '__main__':
        main()
    

    you can easily swap ThreadPoolExecutor for ProcessPoolExecutor and Measure which one is better. you might want to delete the print("line written") and its counterpart as they are only for illustrative purpose.

    for something as small as just line.upper, then just processing it on the main thread will be faster than either option.

    FYI: don't use this code in production, if an exception happens in the writer then your app will be stuck forever, you need to catch whatever fut.result() could throw.