Search code examples
pythoncallbackfseventspython-watchdog

Skip steps in fsevents queue


I'm currently monitoring a folder using fsevents. Every time a file is added, a code is executed on this file. A new file is added to the folder every second.

from fsevents import Observer, Stream

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        # do stuff with fileChanged file

if __name__ == "__main__":
    observer = Observer()   
    observer.start()
    stream = Stream(file_event_callback, 'folder', file_events=True)
    observer.schedule(stream)
    observer.join()

This works quite well. The only problem is, that the libary is building a queue for every file added to the folder. The code executed within the file_event_callback can take more then a second. When that happens the other items in the queue should be skipped so that only the newest one is used.

How can I skip items from the queue so that only the latest addition to the folder used after the last one is finished?

I tried using watchdog first but as this has to run on a mac I had some troubles making it work the way I wanted.


Solution

  • I don't know exactly what library you're using, and when you say "this is building a queue…" I have no idea what "this" you're referring to… but an obvious answer is to stick your own queue in front of whatever it's using, so you can manipulate that queue directly. For example:

    import queue
    import threading
    
    def skip_get(q):
        value = q.get(block=True)
        try:
            while True:
                value = q.get(block=False)
        except queue.Empty:
            return value
    
    q = queue.Queue()
    
    def file_event_callback(event):
        # code 256 for adding file to folder
        if event.mask == 256:
            fileChanged = event.name
            q.put(fileChanged)
    
    def consumer():
        while True:
            fileChanged = skip_get(q)
            if fileChanged is None:
                return
            # do stuff with fileChanged
    

    Now, before you start up the observer, do this:

    t = threading.Thread(target=consumer)
    t.start()
    

    And at the end:

    observer.join()
    q.put(None)
    t.join()
    

    So, how does this work?

    First, let's look at the consumer side. When you call q.get(), this pops the first thing off the queue. But what if nothing is there? That's what the block argument is for. If it's false, the get will raise a queue.Empty exception. If it's true, the get will wait forever (in a thread-safe way) until something appears to be popped. So, by blocking once, we handle the case where there's nothing to read yet. By then looping without blocking, we consume anything else on the queue, to handle the case where there are too many things to read. Because we keep reassigning value to whatever we popped, what we end up with is the last thing put on the queue.

    Now, let's look at the producer side. When you call q.put(value), that just puts value on the queue. Unless you've put a size limit on the queue (which I haven't), there's no way this could block, so you don't have to worry about any of that. But now, how do you signal the consumer thread that you're finished? It's going to be waiting in q.get(block=True) forever; the only way to wake it up is to give it some value to pop. By pushing a sentinel value (in this case, None is fine, because it's not valid as a filename), and making the consumer handle that None by quitting, we give ourselves a nice, clean way to shutdown. (And because we never push anything after the None, there's no chance of accidentally skipping it.) So, we can just push None, then be sure that (barring any other bugs) the consumer thread will eventually quit, which means we can do t.join() to wait until it does without fear of deadlock.


    I mentioned above that you could do this more simply with a Condition. If you think about how a queue actually works, it's just a list (or deque, or whatever) protected by a condition: the consumer waits on the condition until there's something available, and the producer makes something available by adding it to the list and signaling the condition. If you only ever want the last value, there's really no reason for the list. So, you can do this:

    class OneQueue(object):
        def __init__(self):
            self.value = None
            self.condition = threading.Condition()
            self.sentinel = object()
        def get(self):
            with self.condition:
                while self.value is None:
                    self.condition.wait()
                value, self.value = self.value, None
                return value
        def put(self, value):
            with self.condition:
                self.value = value
                self.condition.notify()
        def close(self):
            self.put(self.sentinel)
    

    (Because I'm now using None to signal that nothing is available, I had to create a separate sentinel to signal that we're done.)

    The problem with this design is that if the producers puts multiple values while the consumer is too busy to handle them, it can miss some of them—but in this case, that "problem" is exactly what you were looking for.

    Still, using lower-level tools always means there's a lot more to get wrong, and this is especially dangerous with threading synchronization, because it involves problems that are hard to wrap your head around, and hard to debug even when you understand them, so you might be better off using a Queue anyway.