Search code examples
python-3.xmultithreadingunit-testingpython-unittestpython-watchdog

How can I write Unit Tests for classes that run until Keyboard Interrupt and don't return anything?


I am working on a Python service that monitors a directory in the filesystem. When it sees that a file has been created or moved there, it sends the path of the file into a Kafka queue. I have the service working exactly like I need it to, but my problem is that I am supposed to have at least 90% coverage with unit tests. I am relatively new to Python, and I haven't ever used unit testing in any language before, so I feel really out of my depth. I just can't wrap my head around how I would go about testing these classes.

This is the class that monitors the file system, I'm using the watchdog library.

I added the handler=FileHandler parameter to init because I figured I could use that to pass the class a fake handler I could use for the tests, but that feels like it's unnecessarily complicated.

class FileSystemMonitor:

    def __init__(self, target_path, kafka_queue, handler=FileHandler):
        self.path = target_path
        self.queue = kafka_queue
        self.handler = handler(self.queue)

    def start(self):
        observer = Observer()
        observer.schedule(self.handler, self.path, recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

def parse_args():
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
    return path, queue

if __name__ == "__main__":
    path, queue = parse_args()
    monitor = FileSystemMonitor(path, queue)
    monitor.start()

This is the class I made which handles the events thrown by the monitor, and passes the path to the Kafka Queue.

class FileHandler(PatternMatchingEventHandler):

    def __init__(self, queue):
        super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
        self.queue = queue

    def on_any_event(self, event):
        super(FileHandler, self).on_any_event(event)
        #print(event, self.queue)
        result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
        print("Handler:", result)
        return result

I've written some tests for the kafkaProducer class, and I didn't have a very hard time with that, because it actually returns a value that I could test.

FileSystemMonitor runs infinitely and just waits for a keyboard interrupt, and when it does end, it doesn't return anything, so how do I write the unit tests for it?

As for the FileHandler class it depends on events being triggered by the monitor class, so how would I isolate the Handler class to test it?


Solution

  • FileSystemMonitor.start is very hard to test since it blocks until an external event happens but the test can't easily make the event happen because of the block. I guess you could do some trick with multithreading or multiprocessing or maybe just a timer, but this would add some indeterminism to your test which I don't like.

    A more explicit approach is allow the caller to specify what happens inside the while loop so that an exception can be raised in the test while time.sleep will be called in the production code.

    class FileSystemMonitor:
        def __init__(self, target_path, kafka_queue, handler=FileHandler):
            self.path = target_path
            self.queue = kafka_queue
            self.handler = handler(self.queue)
    
        def start(self, loop_action):
            observer = Observer()
            observer.schedule(self.handler, self.path, recursive=True)
            observer.start()
            try:
                while True:
                    loop_action()
            except KeyboardInterrupt:
                observer.stop()
            observer.join()
    

    This is what your test would look like:

    def fake_loop_action():
        raise KeyboardInterrupt
    
    def test_FileSystemMonitor():
        # Initialize target_path, kafka_queue and handler here.
        # You might want to use test doubles.
        monitor = FileSystemMonitor(target_path, kafka_queue, handler)
        monitor.start(loop_action=fake_loop_action)
    

    And in the production code you would use time.sleep instead. You can even specify the delay in the call now.

    monitor.start(loop_action=lambda: time.sleep(1))