Search code examples
pythontwistedtwisted.internet

Why do twisted.internet.inotify.INotify() events happen outside the test in a twisted.trial based test?


I am new to creating tests with twisted.trial. I am trying to understand how the twisted.internet.inotify.INotify class is interacting with my twisted.trial unit test code.

Test Code:

import sys
import os
from twisted.trial import unittest
from twisted.internet import inotify
from twisted.python import filepath

STORAGE_PATH = '/tmp/_fake_worker'
CONVERSION_FNAME = "cloud_stream_conv.txt"
TEST_PATH = os.path.join(STORAGE_PATH, CONVERSION_FNAME)


class FileWatcher(object):

    def __init__(self, test_path=TEST_PATH):
        self.test_path = test_path
        self.notifier = inotify.INotify()

    def start(self):
        self.log("Starting FileWatcher")
        self.notifier.startReading()
        self.notifier.watch(
            filepath.FilePath(self.test_path),
            callbacks=[self.notify]
        )

    def stop(self):
        self.log("Stopping FileWatcher")
        self.notifier.loseConnection()

    def log(self, format_spec, *args):
        sys.stderr.write(format_spec % args)
        sys.stderr.write("\n")

    def notify(self, ignored, filepath, mask):
        try:
            self.log(
                "event %s on %s",
                ', '.join(inotify.humanReadableMask(mask)),
                filepath
            )
        except Exception:
            self.log("\nUncaught exception: %s\n", sys.exc_info())


class TestFileWatcher(unittest.TestCase):

    def setUp(self):
        os.makedirs(STORAGE_PATH, exist_ok=True)
        self.test_file = filepath.FilePath(TEST_PATH)
        self.test_file.touch()
        self.fw = FileWatcher()
        self.log_entries = []

    def tearDown(self):
        print("Started tearDown for test %s\n" % self.id)
        self.test_file.remove()
        print("Finished tearDown for test %s\n" % self.id)

    def fake_log(self, format_spec, *args):
        self.log_entries.append(
            format_spec % args
        )

    def test_activity(self):
        self.patch(self.fw, "log", self.fake_log)
        self.fw.start()
        self.assertTrue(self.log_entries)
        print(
            "\nlog_entries (%d) = %s\n" % (
                len(self.log_entries),
                self.log_entries,
            )
        )

When I run the test, I see this:

evertz@4979b35e5956:~/src$ venv/bin/python -m twisted.trial -e tests.component.test_simple_inotify
tests.component.test_simple_inotify
  TestFileWatcher
    test_activity ... 
log_entries (1) = ['Starting FileWatcher']

Started tearDown for test <bound method TestCase.id of <tests.component.test_simple_inotify.TestFileWatcher testMethod=test_activity>>

Finished tearDown for test <bound method TestCase.id of <tests.component.test_simple_inotify.TestFileWatcher testMethod=test_activity>>

event attrib on FilePath(b'/tmp/_fake_worker/cloud_stream_conv.txt')
event delete_self on FilePath(b'/tmp/_fake_worker/cloud_stream_conv.txt')
                                                     [OK]

-------------------------------------------------------------------------------
Ran 1 tests in 0.015s

PASSED (successes=1)

I also tried this variant for my test:

    def test_activity(self):
        self.patch(self.fw, "log", self.fake_log)

        def delete_file(result):
            self.test_file.remove()

        def report_activity(result):
            self.assertTrue(self.log_entries)
            print(
                "\nlog_entries (%d) = %s\n" % (
                    len(self.log_entries),
                    self.log_entries,
                )
            )

        self.fw.start()
        d = defer.Deferred()
        d.addCallback(delete_file)
        d.addCallback(report_activity)
        reactor.callLater(2, d.callback, None)
        return d

But still got something similar:

evertz@4979b35e5956:~/src$ venv/bin/python -m twisted.trial -e tests.component.test_simple_inotify
tests.component.test_simple_inotify
  TestFileWatcher
    test_activity ... 
log_entries (1) = ['Starting FileWatcher']

Started tearDown for test <bound method TestCase.id of <tests.component.test_simple_inotify.TestFileWatcher testMethod=test_activity>>

Finished tearDown for test <bound method TestCase.id of <tests.component.test_simple_inotify.TestFileWatcher testMethod=test_activity>>

event attrib on FilePath(b'/tmp/_fake_worker/cloud_stream_conv.txt')
event delete_self on FilePath(b'/tmp/_fake_worker/cloud_stream_conv.txt')
                                                     [OK]

-------------------------------------------------------------------------------
Ran 1 tests in 2.006s

PASSED (successes=1)

I was expecting that the messages starting with event [...] would have been inside the self.log_entries variable in my test code and that the events would actually happen sometime before/during the test, instead of after tearDown(). How do I write a test that will make the INotify() instance trigger its callbacks so that I can test it properly?

Solution based on Recommendations from Jean-Paul Calderone's advice:

import sys
import os
import time
from twisted.trial import unittest
from twisted.internet import inotify
from twisted.internet import reactor
from twisted.internet import defer
from twisted.python import filepath

STORAGE_PATH = '/tmp/_fake_worker'
CONVERSION_FNAME = "cloud_stream_conv.txt"
TEST_PATH = os.path.join(STORAGE_PATH, CONVERSION_FNAME)


class FileWatcher(object):

    def __init__(self, parent, test_path=TEST_PATH):
        self.parent = parent
        self.test_path = test_path
        self.notifier = inotify.INotify()
        self.doneCalled = False

    def start(self):
        self.log("Starting FileWatcher")
        self.notifier.startReading()
        self.notifier.watch(
            filepath.FilePath(self.test_path),
            callbacks=[self.notify]
        )

    def stop(self):
        self.log("Stopping FileWatcher")
        self.notifier.loseConnection()

    def log(self, format_spec, *args):
        sys.stderr.write(format_spec % args)
        sys.stderr.write("\n")

    def notify(self, ignored, filepath, mask):
        try:
            self.log(
                "event %s on %s",
                ', '.join(inotify.humanReadableMask(mask)),
                filepath
            )
            if not self.doneCalled:
                reactor.callLater(2, self.parent.done.callback, None)
                self.doneCalled = True
        except Exception:
            self.log("\nUncaught exception: %s\n", sys.exc_info())

class TestFileWatcher(unittest.TestCase):

    def setUp(self):
        os.makedirs(STORAGE_PATH, exist_ok=True)
        self.test_file = filepath.FilePath(TEST_PATH)
        self.test_file.touch()
        self.fw = FileWatcher(self)
        self.log_entries = []

    def tearDown(self):
        try:
            self.test_file.remove()
        except FileNotFoundError:
            pass

    def fake_log(self, format_spec, *args):
        self.log_entries.append(
            format_spec % args
        )

    def test_activity(self):
        self.patch(self.fw, "log", self.fake_log)
        self.done = defer.Deferred()
        self.fw.start()

        self.test_file.remove()

        def report_activity(result):
            self.assertTrue(self.log_entries)
            print(
                "\nlog_entries (%d) = %s\n" % (
                    len(self.log_entries),
                    self.log_entries,
                )
            )

        self.done.addCallback(report_activity)
        return self.done

When I run this, I get the expected results I wanted:

evertz@f24cc5aacff8:~/src$ venv/bin/python -m twisted.trial -e tests.component.test_simple_inotify.TestFileWatcher.test_activity
tests.component.test_simple_inotify
  TestFileWatcher
    test_activity ... 
log_entries (3) = ['Starting FileWatcher', "event attrib on FilePath(b'/tmp/_fake_worker/cloud_stream_conv.txt')", "event delete_self on FilePath(b'/tmp/_fake_worker/cloud_stream_conv.txt')"]

                                                     [OK]

-------------------------------------------------------------------------------
Ran 1 tests in 2.005s

PASSED (successes=1)

I'm still using the mutable state variable here, but I put it where the inotify activity is and used the reactor.callLater() to give it extra time so that all the state changes were captured. Not the most elegant solution, but that will come with experience.


Solution

  • First, a little bit about trial.

    twisted.trial.unittest.TestCase supports "asynchronous" tests. That is, tests which return a Deferred (or a "coroutine", if you have a new enough version of Twisted). If a test method returns a value of one of these types then that test keeps running until the asynchronous result is available, then the asynchronous result is interpreted as a synchronous result would have been interpreted. While this is happening, the global reactor is running.

    Additionally, after any twisted.trial.unittest.TestCase test method completes, trial lets the global reactor run for a very short period of time before considering the test complete.

    Finally, recall that Twisted is a cooperative multitasking system. This means that in the usual case, only one piece of your program is running at any given time and the piece that is running only changes when you allow this to happen.

    Now, about Twisted support for inotify. twisted.internet.inotify.INotify uses a reactor to receive information from the Linux inotify system. It needs a running reactor or it cannot operate properly.

    Your first test method has the problem that it doesn't allow the reactor to run until it is finished. You observed the consequence of this - setUp runs, then your test method, then tearDown, then trial runs the reactor a little bit which causes your inotify code to run - seemingly after the test is over.

    Your second test method has a similar problem. It does allow the reactor to run (by returning but signaling that it isn't done by making its return value a Deferred). However, it doesn't provoke any inotify activity (delete_file) until it "instantaneously" before signaling that it is done:

    1. It uses callLater to delay the firing of d by 2 seconds
    2. After 2 seconds, delete_file runs.
    3. Immediately after, with no chance for the reactor to run, report_activity runs.
    4. Immediately after, with no chance for the reactor to run, trial gets to see d has a result, which causes trial to consider the test finished.
    5. trial runs the reactor a little bit more and the inotify stuff happens.

    So to solve your issue, you need to make sure the inotify pieces can happen before you signal to trial that the test is finished.

    This might look something like:

        ...
    
        def fake_log(self, format_spec, *args):
            self.log_entries.append(
                format_spec % args
            )
            self.done.callback(None)
    
        ...
    
        def test_activity(self):
            self.patch(self.fw, "log", self.fake_log)
            self.done = defer.Deferred()
            self.fw.start()
    
            self.test_file.remove()
    
            def report_activity(result):
                self.assertTrue(self.log_entries)
                print(
                    "\nlog_entries (%d) = %s\n" % (
                        len(self.log_entries),
                        self.log_entries,
                    )
                )
    
            self.done.addCallback(report_activity)
            return self.done
    

    Notice how the inotify activity you want to observe is now in control of when the Deferred that signals the test is finished.

    Also note I don't love how I've managed self.done here: it's an example of mutable state and action at a distance that makes code hard to understand and maintain; it also might introduce issues of your inotify watch callback is called more than once (you can only callback a Deferred once). I did it this way because it was easiest to fit closely to your example code so I think it answers your specific question better than re-arranging everything would.