Search code examples
pythonmultithreadingwinapiasynchronouspython-watchdog

Watchdog compatibility: A workaround for "CancelIoEx"


Using the python watchdog file system events watching library I noticed that when being used under Windows Server 2003 it entered into "Polling Mode" thus stoping using asynchronous OS notification and, therefore, heavily reducing system performance under big amount of file changes.

I traced the problem to watchdog/observers/winapi.py file where CancelIoEx system call is used in order to stop ReadDirectoryChangesW call lock when the user wants to stop monitoring the watched directory or file:

(winapi.py)

CancelIoEx = ctypes.windll.kernel32.CancelIoEx
CancelIoEx.restype = ctypes.wintypes.BOOL
CancelIoEx.errcheck = _errcheck_bool
CancelIoEx.argtypes = (
    ctypes.wintypes.HANDLE,  # hObject
    ctypes.POINTER(OVERLAPPED)  # lpOverlapped
)

...
...
...

def close_directory_handle(handle):
    try:
        CancelIoEx(handle, None)  # force ReadDirectoryChangesW to return
    except WindowsError:
        return

The problem with CancelIoEx call is that it is not available until Windows Server 2008: http://msdn.microsoft.com/en-us/library/windows/desktop/aa363792(v=vs.85).aspx

One possible alternative is to change close_directory_handle in order to make it create a mock file within the monitored directory, thus unlocking the thread waiting for ReadDirectoryChangesW to return.

However, I noticed that CancelIo system call is in fact available in Windows Server 2003:

Cancels all pending input and output (I/O) operations that are issued by the calling thread for the specified file. The function does not cancel I/O operations that other threads issue for a file handle. To cancel I/O operations from another thread, use the CancelIoEx function.

But calling CancelIo won't affect the waiting thread.

Do you have any idea on how to solve this problem? May be threading.enumerate() could be used issue a signal to be handled by each thread being CancelIo called from these handlers?


Solution

  • The natural approach is to implement a completion routine and call to ReadDirectoryChangesW using its overlapped mode. The following example shows the way to do that:

    RDCW_CALLBACK_F = ctypes.WINFUNCTYPE(None, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.POINTER(OVERLAPPED))
    

    First, create a WINFUNCTYPE factory which will be used to generate (callable from Windows API) C like functions from python methods. In this case, no return value and 3 parameters corresponding to

    VOID CALLBACK FileIOCompletionRoutine(
      _In_     DWORD dwErrorCode,
      _In_     DWORD dwNumberOfBytesTransfered,
      _Inout_  LPOVERLAPPED lpOverlapped
    );
    

    FileIOCompletionRoutine header.

    The callback reference as well as the overlapped structure need to be added to ReadDirectoryChangesW arguments list:

    ReadDirectoryChangesW = ctypes.windll.kernel32.ReadDirectoryChangesW
    
    ReadDirectoryChangesW.restype = ctypes.wintypes.BOOL
    ReadDirectoryChangesW.errcheck = _errcheck_bool
    ReadDirectoryChangesW.argtypes = (
        ctypes.wintypes.HANDLE,  # hDirectory
        LPVOID,  # lpBuffer
        ctypes.wintypes.DWORD,  # nBufferLength
        ctypes.wintypes.BOOL,  # bWatchSubtree
        ctypes.wintypes.DWORD,  # dwNotifyFilter
        ctypes.POINTER(ctypes.wintypes.DWORD),  # lpBytesReturned
        ctypes.POINTER(OVERLAPPED),  # lpOverlapped
        RDCW_CALLBACK_F  # FileIOCompletionRoutine # lpCompletionRoutine
    )
    

    From here, we are ready to perform the overlapped system call. This is a simple call bacl just usefult to test that everything works fine:

    def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
         print("dir_change_callback! PID:" + str(os.getpid()))
         print("CALLBACK THREAD: " + str(threading.currentThread()))
    

    Prepare and perform the call:

    event_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
    nbytes = ctypes.wintypes.DWORD()
    overlapped_read_dir = OVERLAPPED()
    call2pass = RDCW_CALLBACK_F(dir_change_callback)
    
    hand = get_directory_handle(os.path.abspath("/test/"))
    
    def docall():
        ReadDirectoryChangesW(hand, ctypes.byref(event_buffer),
                              len(event_buffer), False,
                              WATCHDOG_FILE_NOTIFY_FLAGS,
                              ctypes.byref(nbytes), 
                              ctypes.byref(overlapped_read_dir), call2pass)
    
    print("Waiting!")
    docall()
    

    If you load and execute all this code into a DreamPie interactive shell you can check the system call is done and that the callback executes thus printing the thread and pid numbers after the first change done under c:\test directory. Besides, you will notice those are the same than the main thread and process: Despite the event is raised by a separated thread, the callback runs in the same process and thread as our main program thus providing an undesired behaviour:

    lck = threading.Lock()
    
    def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
         print("dir_change_callback! PID:" + str(os.getpid()))
         print("CALLBACK THREAD: " + str(threading.currentThread()))
    
    ...
    ...
    ...
    
    lck.acquire()
    print("Waiting!")
    docall()
    lck.acquire()
    

    This program will lock the main thread and the callback will never execute. I tried many synchronization tools, even Windows API semaphores always getting the same behaviour so, finally, I decided to implement the ansynchronous call using the synchronous configuration for ReadDirectoryChangesW within a separate process managed and synchronized using multiprocessing python library:

    Calls to get_directory_handle won't return the handle number given by windows API but one managed by winapi library, for that I implemented a handle generator:

    class FakeHandleFactory():
        _hl = threading.Lock()
        _next = 0
        @staticmethod
        def next():
            FakeHandleFactory._hl.acquire()
            ret = FakeHandleFactory._next
            FakeHandleFactory._next += 1
            FakeHandleFactory._hl.release()
            return ret
    

    Each generated handle has to be globally associated with a file system path:

    handle2file = {}
    

    Each call to read_directory_changes will now generate ReadDirectoryRequest (derived from multiprocessing.Process) object:

    class ReadDirectoryRequest(multiprocessing.Process):
    
        def _perform_and_wait4request(self, path, recursive, event_buffer, nbytes):
            hdl = CreateFileW(path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS,
                           None, OPEN_EXISTING, WATCHDOG_FILE_FLAGS, None)
            #print("path: " + path)
            aux_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
            aux_n = ctypes.wintypes.DWORD()
            #print("_perform_and_wait4request! PID:" + str(os.getpid()))
            #print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
            try:
                ReadDirectoryChangesW(hdl, ctypes.byref(aux_buffer),
                                  len(event_buffer), recursive,
                                  WATCHDOG_FILE_NOTIFY_FLAGS,
                                  ctypes.byref(aux_n), None, None)
            except WindowsError as e:
                print("!" + str(e))
                if e.winerror == ERROR_OPERATION_ABORTED:
                    nbytes = 0
                    event_buffer = []
                else:
                    nbytes = 0
                    event_buffer = []
            # Python 2/3 compat
            nbytes.value = aux_n.value
            for i in xrange(self.int_class(aux_n.value)):
                event_buffer[i] = aux_buffer[i]
            CloseHandle(hdl)
            try:
                self.lck.release()
            except:
                pass
    
    
    
        def __init__(self, handle, recursive):
            buffer = ctypes.create_string_buffer(BUFFER_SIZE)
            self.event_buffer = multiprocessing.Array(ctypes.c_char, buffer)
            self.nbytes = multiprocessing.Value(ctypes.wintypes.DWORD, 0)
            targetPath = handle2file.get(handle, None)
            super(ReadDirectoryRequest, self).__init__(target=self._perform_and_wait4request, args=(targetPath, recursive, self.event_buffer, self.nbytes))
            self.daemon = True
            self.lck = multiprocessing.Lock()
            self.result = None
            try:
                self.int_class = long
            except NameError:
                self.int_class = int
            if targetPath is None:
                self.result = ([], -1)
    
        def CancelIo(self):
            try:
                self.result = ([], 0)
                self.lck.release()
            except:
                pass
    
        def read_changes(self):
            #print("read_changes! PID:" + str(os.getpid()))
            #print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
            if self.result is not None:
                raise Exception("ReadDirectoryRequest object can be used only once!")
            self.lck.acquire()
            self.start()
            self.lck.acquire()
            self.result = (self.event_buffer, self.int_class(self.nbytes.value))
            return self.result
    

    This class specifies Process providing a process which perform the system call and waits until (or):

    • A change event has been raised.
    • The main thread cancels the request by calling to the ReadDirectoryRequest object CancelIo method.

    Note that:

    • get_directory_handle
    • close_directory_handle
    • read_directory_changes

    Roles are now to manage requests. For that, thread locks and auxiliary data structures are needed:

    rqIndexLck = threading.Lock() # Protects the access to `rqIndex`
    rqIndex = {} # Maps handles to request objects sets.
    

    get_directory_handle

    def get_directory_handle(path):
        rqIndexLck.acquire()
        ret = FakeHandleFactory.next()
        handle2file[ret] = path
        rqIndexLck.release()
        return ret
    

    close_directory_handle

    def close_directory_handle(handle):
        rqIndexLck.acquire()
        rqset4handle = rqIndex.get(handle, None)
        if rqset4handle is not None:
            for rq in rqset4handle:
                rq.CancelIo()
            del rqIndex[handle]
        if handle in handle2file:
            del handle2file[handle]
        rqIndexLck.release()
    

    And last but not least: read_directory_changes

    def read_directory_changes(handle, recursive):
        rqIndexLck.acquire()
        rq = ReadDirectoryRequest(handle, recursive)
        set4handle = None
        if handle in rqIndex:
            set4handle = rqIndex[handle]
        else:
            set4handle = set()
            rqIndex[handle] = set4handle
        set4handle.add(rq)
        rqIndexLck.release()
        ret = rq.read_changes()
        rqIndexLck.acquire()
        if rq in set4handle:
            set4handle.remove(rq)
        rqIndexLck.release()
        return ret