Search code examples
pythonpython-multithreadingcontextmanager

How to use a thread in a context manager?


Consider this threading.Thread class:

class Sleeper(threading.Thread):
    def __init__(self, sleep=5.0):
        threading.Thread.__init__(self)
        self.event = threading.Event()
        self.sleep = sleep

    def run(self):
        while self.sleep > 0 and not self.event.is_set():
            self.event.wait(1.0)
            self.sleep -= 1

    def stop(self):
        self.event.set()

It sleeps for a certain amount of time and exits or is stopped before reaching that amount.

I use it as:

sleeper = Sleeper()
try:
    sleeper.start()
    # do stuffs here
except:
    # handle possible exceptions here
finally:
    sleeper.stop()

And I would much rather use it like a context manager:

with Sleeper():
    # do stuffs here

and then the thread is stopped when exiting the with block.

I have tried adding __enter__ and __exit__ methods and it seems to work but I'm not sure this is the way to go:

def __enter__(self):
    self.start()
    return self

and

def __exit__(self, type, value, traceback):
    self.stop()

But I'm really not sure what I'm doing here. How should it be done properly?


Solution

  • Even though not quite understand your question, due to lack of background of your aws related problem. It's doable to use context to do this, just as you mentioned.

    import threading
    import time
    
    
    class Sleeper(threading.Thread):
        def __init__(self, sleep=5.0):
            threading.Thread.__init__(self, name='Sleeper')
            self.stop_event = threading.Event()
            self.sleep = sleep
    
        def run(self):
            print('Thread {thread} started'.format(thread=threading.current_thread()))
            while self.sleep > 0 and not self.stop_event.is_set():
                time.sleep(1.0)
                self.sleep -= 1
            print('Thread {thread} ended'.format(thread=threading.current_thread()))
    
        def stop(self):
            self.stop_event.set()
    
        def __enter__(self):
            self.start()
            return self
    
        def __exit__(self, *args, **kwargs):
            self.stop()
            print('Force set Thread Sleeper stop_event')
    
    
    with Sleeper(sleep=2.0) as sleeper:
        time.sleep(5)
    
    print('Main Thread ends')
    

    You can test the two cases: 1. main sleep more time, 2. Sleeper thread has a bigger sleep parameter, they will end up two results;

    If you still want to interact with the Sleeper thread with the main, your code should looks like this:

    with Sleeper(sleep=2.0) as sleeper:
        cnt = 15
    
        while cnt > 0 and sleeper.is_alive():
            print(cnt)
            cnt -= 1
            time.sleep(1)
    

    And you can see the main just print a few number, due to the sleeper has end and is not alive anymore.