Search code examples
pythonmultithreadingfile-iogenerator

How to use generator funtion in parallel to the main loop in python


Expanding on Jeff Bauer's answer https://stackoverflow.com/a/5420116/11306525. My use case is to read a file continuously, save entries in a list if it matches certain regex, and perform operations on all the saved entries one by one.

I can do this sequentially, but I fear I would not be able to tail the file all the times like this.

# Generator function to read the file and yield matching lines as they are found
def read_file(keyword):
    with open("file.txt", "r") as file:
        for line in file:
            if keyword in line:
                # Yield matching line
                yield line.strip()

# Main loop to perform operations on matching lines yielded by the generator
keyword = "example"
for matching_line in read_file(keyword):
    # Perform a lot of operations on matching line
    print(matching_line)

So, I do not want to wait for either the generator function to find a match(of course cannot avoid waiting until first match is found), nor wait for the operations to finish until I could begin reading the file again. So, ideally both should continue in parallel.

Threading the 2 functions comes to my mind as the first approach, but what other approaches could be applied here?


Solution

  • one thing to realize is that python has a global interpreter lock (GIL), which prevents concurrent execution of python code, so the only advantage of using threading is to work during the sleep time (if there is any) or when the OS is reading the file from disk.

    with that in mind you can have one thread reading the file and putting the results in a queue and the main thread working on the data in the queue (a producer-consumer pattern) as follows.

    import threading
    import queue
    
    # Generator function to read the file and send back matching lines as they are found
    def read_file(keyword, results_queue: queue.Queue):
        try:
            with open("file.txt", "r") as file:
                for line in file:
                    if keyword in line:
                        # put matching line
                        results_queue.put(line.strip())
        except Exception:
            import traceback
            traceback.print_exc()
            print(flush=True)
        finally:
            results_queue.put(None)
    
    keyword = "example"
    results_queue = queue.Queue()
    threading.Thread(target=read_file, args=(keyword, results_queue), daemon=True).start()
    # Main loop to perform operations on matching lines in queue
    for matching_line in iter(results_queue.get, None):  # exit on None
        # Perform a lot of operations on matching line
        print(matching_line)
    

    for the iteration on the queue refer to the following answer for a detailed explaination.

    How to iterate through a Python Queue.Queue with a for loop instead of a while loop?

    the extra try-finally is necessary otherwise errors will cause a deadlock.


    True parallelism

    in order to get around the GIL limitation you can use multiprocessing instead of threading, with 2 obvious disadvantages.

    1. worker spawn time, and IPC time.
    2. code structure requirements.

    just replacing both the queue and thread with their multiprocessing counterparts should do what you want.

    import multiprocessing
    
    # Generator function to read the file and send back matching lines as they are found
    def read_file(keyword, results_queue: multiprocessing.Queue):
        try:
            with open("file.txt", "r") as file:
                for line in file:
                    if keyword in line:
                        # put matching line
                        results_queue.put(line.strip())
        except Exception:
            import traceback
            traceback.print_exc()
            print(flush=True)
        finally:
            results_queue.put(None)
    
    if __name__ == "__main__":
        keyword = "example"
        results_queue = multiprocessing.Queue()
        multiprocessing.Process(target=read_file, args=(keyword, results_queue), 
                                daemon=True).start()
        # Main loop to perform operations on matching lines in queue
        for matching_line in iter(results_queue.get, None):  # exit on None
            # Perform a lot of operations on matching line
            print(matching_line)