Search code examples
pythonmultithreadingcontextmanager

Context manager and daemon thread


I am starting a daemon thread from a context manager that should send a heartbeat every second, but since it is running in a thread it will not terminate the context manager if an exception occurs. How do I raise an exception in the context manager when the heartbeats stops?

from contextlib import contextmanager
from threading import Thread, Event
from time import sleep


@contextmanager
def plc():
    stop_event = Event()

    try:
        # Send heartbeat every second
        hb_t = Thread(target=heartbeat_task,
                      args=(stop_event,),
                      daemon=True)
        hb_t.start()

        yield
    except Exception:
        raise
    finally:
        stop_event.set()
        hb_t.join()
        print("Heartbeat stopped")


def heartbeat_task(stop_event):

    value = False
    
    while not stop_event.is_set():

        value = not value

        print("Heartbeat: " + str(value))

        sleep(1)


def main():

    with plc():

        while True:

            print("Program running")

            sleep(5)

if __name__ == '__main__':
    main()

I have a hard time finding examples of this.

Thanks for helping out!


Solution

  • Update

    I have modified the code to be more closely aligned with the code you posted. But:

    Your code as presented has an inconsistency: heartbeat_task is passed an event that if set will cause the function to return. But it is only set when the context manager in function main created with with plc(): exits, which is never. If you are hoping that any exception thrown by heartbeat_task will force the context manager to exit and then caught in function plc, then what is the point of it calling stop_event.set() if by definition we only arrive here if heartbeat_task is no longer running due to an exception?

    So either you want heartbeat_task to run indefinitely until it raises an exception (in which case there is no point in having a "stop" event) or you want to be able to stop heartbeat_task when some condition exists, but there is no code for doing that. I will assume for demo purposes that main will be given access to the stop_event event and will set it under some circumstance. Otherwise, it runs until it detects that heartbeat_task is no longer running presumably because it raised an exception (it is executing an infinite loop, so how else could it terminate if the stop event has not been set?). What remains is why you need to be using a context manager at all. I will present an alternative later on.

    If you use a multithreading pool (we only need one thread in the pool), it become simple for the main thread to catch exceptions thrown by a task submitted to the pool: When multiprocessing.pool.ThreadPool.apply_async is called a multiprocessing.pool.AsyncResult instance is returned that represents a future completion. When method get is called on this instance you either get the return value from the worker function (heartbeat_task) or any exception thrown by the worker function is re-raised. But we can also use method wait to wait for either the completion of the submitted task or an elapsed time. We can then test whether after waiting 5 seconds whether the submitted task actually finished (due to an exception or return) with method ready. If the task is still running, then we can tell it to stop. In this demo I force the task to raise an exception after approximately 7 seconds:

    from contextlib import contextmanager
    from threading import Event
    from multiprocessing.pool import ThreadPool
    from time import sleep
    
    
    @contextmanager
    def plc():
        stop_event = Event()
        pool = ThreadPool(1)
    
        # Send heartbeat every second
        async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
        yield stop_event, async_result
        # We only return here if the task is no longer running
        try:
            # See if task threw an exception and if so, catch it:
            async_result.get()
        except Exception as e:
            print("Got exception:", e)
        finally:
            pool.close()
            pool.join()
            print("Heartbeat stopped")
    
    
    def heartbeat_task(stop_event):
        # For demo purposes, we will force an exception to occur
        # after approximately 7 seconds:
        value = False
    
        n = 0
        while not stop_event.is_set():
            value = not value
            print("Heartbeat: " + str(value))
            sleep(1)
            n += 1
            if n == 7:
                raise Exception('Oops!')
    
    
    def main():
        with plc() as tpl:
            stop_event, async_result = tpl
            # This function could forcibly cause the heartbeat_task
            # to complete by calling stop_event.set()
    
            # Loop while the task is still running
            while not async_result.ready():
                """
                if some_condition:
                    stop_event.set()
                    break
                """
                print("Program running")
                # Sleep for 5 seconds or until heartbeat_task terminates:
                async_result.wait(5)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    Program running
    Heartbeat: True
    Heartbeat: False
    Heartbeat: True
    Heartbeat: False
    Heartbeat: True
    Program running
    Heartbeat: False
    Heartbeat: True
    Got exception: Oops!
    Heartbeat stopped
    

    Alternative to Using a Context Manager

    from threading import Event
    from multiprocessing.pool import ThreadPool
    from time import sleep
    
    
    def heartbeat_task(stop_event):
        value = False
    
        n = 0
        while not stop_event.is_set():
            value = not value
            print("Heartbeat: " + str(value))
            sleep(1)
            n += 1
            if n == 7:
                raise Exception('Oops!')
    
    def main():
        stop_event = Event()
        pool = ThreadPool(1)
        async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
    
        # Run as long as heartbeat_task is running:
        while not async_result.ready():
            """
            if some_condition:
                stop_event.set()
                break
            """
            print("Program running")
            # Sleep for 5 seconds or until heartbeat_task terminates:
            async_result.wait(5)
    
        # Any exception thrown in heartbeat_task will be rethrown and caught here:
        try:
            async_result.get()
        except Exception as e:
            print("Got exception:", e)
        finally:
            pool.close()
            pool.join()
    
    if __name__ == '__main__':
        main()