Search code examples
pythonloopspython-asynciopython-multiprocessingpython-multithreading

Create and call a function that "asynchronously" updates a file in a loop until the second function that is started in parallel is done


I'm new to multiprocessing / threading and asyncio in Python and I'd like to parallelise two function calls so that the first function updates a healthcheck text file in an endless loop with 5 min. interval until the second function is done. After that the loop for the first function call should be stopped.

Here is an attempt, which is not working:

import multiprocessing
import time

done_event = multiprocessing.Event()

# Function to update the healthcheck text file
def update_healthcheck():
    interval = 5 * 60  # 5 minutes interval in seconds
    while not done_event.is_set():
        with open("healthcheck.txt", "w") as f:
            f.write("Health is okay.")
        time.sleep(interval)

# Function that simulates the second task
def second_task():
    time.sleep(20)  # Simulating some work
    done_event.set()  # Set the event to signal the first function to stop

if __name__ == "__main__":
    # Start the first function in a separate process
    healthcheck_process = multiprocessing.Process(target=update_healthcheck)
    healthcheck_process.start()
    
    # Start the second function in the main process
    second_task()
    
    # Wait for the healthcheck process to finish
    healthcheck_process.join()
    
    print("Both tasks completed.")

What would be a correct and better implementation of that snippet?

Thank you!


Solution

  • Yes - that is the idea -

    Just change from using "multiprocessing" for "threading" instead. You do not detail what is "not working" there - but it is due to the fact that the code written this way creates an independent Event instance in the subprocess - it is likely that simply passing done_event as an argument to update_healthcheck would make that work.

    But there is no reason to use multi-processing in this case - and it brings up some edge cases on shared objects, like the one you've hit. On the other hand, if there is a chance your main task may fail and the program stop running, the "healthcheck" subprocess would die along with it: if process independence is needed, you should keep the health-checking in the main process and delegate the main task to the subprocess, and possibly pass it a multiprocessing.Queue so that it can post telling the main process it is still alive.

    Otherwise, if such a check is redundant, and you only want to annotate when the task is done, the very code you wrote will work if you just change it to use threads:

    import threading
    import time, datetime
    
    done_event = threading.Event()
    
    # Function to update the healthcheck text file
    def update_healthcheck():
        interval = 5 * 60  # 5 minutes interval in seconds
        while not done_event.is_set():
            # append timestamped logs:
            with open("healthcheck.txt", "at") as f:
                f.write(f"{datetime.datetime.now()} - Health is okay. ")
            time.sleep(interval)
    
    # Function that simulates the second task
    def second_task():
        time.sleep(20)  # Simulating some work
        done_event.set()  # Set the event to signal the first function to stop
    
    
    def main():
        # bring code inside functions for better
        # readability/maintaninability
        
        # Start the first function in a separate process
        healthcheck_process = threading.Thread(target=update_healthcheck)
        healthcheck_process.start()
        
        # Start the second function in the main thread
        second_task()
        
        # Wait for the healthcheck process to finish
        healthcheck_process.join()
        
        print("Both tasks completed.")
        
    if __name__ == "__main__":
        # the guarded code should ideally be short -
        # by moving the logic to a function, we need a single line:
        main()
    
    

    Just a note, as you also tagged the question as "asyncio": although asyncio could be used in this case, it requires that all your code is desigend with asyncio in mind. So your main task ("second_function"), would have to be designed to work collaboratively with the asyncio loop, with timely yields to the asyncio loop, so that the monitoring function could run - this means you'd have to ensure expressions like await asyncio.sleep(0) would be executed from time to time in your task. Also, if it is a CPU intensive task, there would be no gain at all - and you would likely have to run it in a secondary thread from a wrapper async-function, needed just to get things working.

    None of that is needed with the multi-threading approach in this case: even tough due to Python's "GIL" design you can't get better performance by running two or more CPU intensive functions with multi-threading alone, it will always switch to your other threads that are doing something (in this case, the monitoring function), without having to worry about most race-conditions and data integrity issue one would get in an environment without an automatic lock like the GIL.