Search code examples
python-3.xfilesystemwatcherpython-watchdog

Python Notify when all files have been transferred


I am using "watchdog" api to keep checking changes in a folder in my filesystem. Whatever files changes in that folder, I pass them to a particular function which starts threads for each file I pass them.

But watchdog, or any other filesystem watcher api (in my knowledge), notifies users file by file i.e. as the files come by, they notify the user. But I would like it to notify me a whole bunch of files at a time so that I can pass that list to my function and take use of multi-threading. Currently, when I use "watchdog", it notifies me one file at a time and I am only able to pass that one file to my function. I want to pass it many files at a time to be able to have multithreading.

One solution that comes to my mind is: you see when you copy a bunch of files in a folder, OS shows you a progress bar. If it would be possible for me to be notified when that progress bar is done, then it would be a perfect solution for my question. But I don't know if that is possible.

Also I know that watchdog is a polling API, and an ideal API for watching filesystem would be interrupt driven api like pyinotify. But I didn't find any API which was interrupt driven and also cross platform. iWatch is good, but only for linux, and I want something for all OS. So, if you have suggestions on any other API, please do let me know.

Thanks.


Solution

  • Instead of accumulating filesystem events, you could spawn a pool of worker threads which get tasks from a common queue. The watchdog thread could then put tasks in the queue as filesystem events occur. Done this way, a worker thread can start working as soon as an event occurs.

    For example,

    import logging
    import Queue
    import threading
    import time
    import watchdog.observers as observers
    import watchdog.events as events
    
    logger = logging.getLogger(__name__)
    
    SENTINEL = None
    
    class MyEventHandler(events.FileSystemEventHandler):
        def on_any_event(self, event):
            super(MyEventHandler, self).on_any_event(event)            
            queue.put(event)
        def __init__(self, queue):
            self.queue = queue
    
    def process(queue):
        while True:
            event = queue.get()
            logger.info(event)
    
    
    if __name__ == '__main__':
        logging.basicConfig(level=logging.DEBUG,
                            format='[%(asctime)s %(threadName)s] %(message)s',
                            datefmt='%H:%M:%S')
    
        queue = Queue.Queue()
        num_workers = 4
        pool = [threading.Thread(target=process, args=(queue,)) for i in range(num_workers)]
        for t in pool:
            t.daemon = True
            t.start()
    
        event_handler = MyEventHandler(queue)
        observer = observers.Observer()
        observer.schedule(
            event_handler,
            path='/tmp/testdir',
            recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
    

    Running

    % mkdir /tmp/testdir
    % script.py
    

    yields output like

    [14:48:31 Thread-1] <FileCreatedEvent: src_path=/tmp/testdir/.#foo>
    [14:48:32 Thread-2] <FileModifiedEvent: src_path=/tmp/testdir/foo>
    [14:48:32 Thread-3] <FileModifiedEvent: src_path=/tmp/testdir/foo>
    [14:48:32 Thread-4] <FileDeletedEvent: src_path=/tmp/testdir/.#foo>
    [14:48:42 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/foo>
    [14:48:47 Thread-2] <FileCreatedEvent: src_path=/tmp/testdir/.#bar>
    [14:48:49 Thread-4] <FileCreatedEvent: src_path=/tmp/testdir/bar>
    [14:48:49 Thread-4] <FileModifiedEvent: src_path=/tmp/testdir/bar>
    [14:48:49 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/.#bar>
    [14:48:54 Thread-2] <FileDeletedEvent: src_path=/tmp/testdir/bar>
    

    Doug Hellman has written an excellent set of tutorials (which has now been edited into a book) which should help you get started:

    I didn't actually end up using a multiprocessing Pool or ThreadPool as discussed in the last two links, but you may find them useful anyway.