I am using "watchdog" api to keep checking changes in a folder in my filesystem. Whatever files changes in that folder, I pass them to a particular function which starts threads for each file I pass them.
But watchdog, or any other filesystem watcher api (in my knowledge), notifies users file by file i.e. as the files come by, they notify the user. But I would like it to notify me a whole bunch of files at a time so that I can pass that list to my function and take use of multi-threading. Currently, when I use "watchdog", it notifies me one file at a time and I am only able to pass that one file to my function. I want to pass it many files at a time to be able to have multithreading.
One solution that comes to my mind is: you see when you copy a bunch of files in a folder, OS shows you a progress bar. If it would be possible for me to be notified when that progress bar is done, then it would be a perfect solution for my question. But I don't know if that is possible.
Also I know that watchdog is a polling API, and an ideal API for watching filesystem would be interrupt driven api like pyinotify. But I didn't find any API which was interrupt driven and also cross platform. iWatch is good, but only for linux, and I want something for all OS. So, if you have suggestions on any other API, please do let me know.
Thanks.
Instead of accumulating filesystem events, you could spawn a pool of worker threads which get tasks from a common queue. The watchdog thread could then put tasks in the queue as filesystem events occur. Done this way, a worker thread can start working as soon as an event occurs.
For example,
import logging
import Queue
import threading
import time
import watchdog.observers as observers
import watchdog.events as events
logger = logging.getLogger(__name__)
SENTINEL = None
class MyEventHandler(events.FileSystemEventHandler):
def on_any_event(self, event):
super(MyEventHandler, self).on_any_event(event)
queue.put(event)
def __init__(self, queue):
self.queue = queue
def process(queue):
while True:
event = queue.get()
logger.info(event)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
queue = Queue.Queue()
num_workers = 4
pool = [threading.Thread(target=process, args=(queue,)) for i in range(num_workers)]
for t in pool:
t.daemon = True
t.start()
event_handler = MyEventHandler(queue)
observer = observers.Observer()
observer.schedule(
event_handler,
path='/tmp/testdir',
recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Running
% mkdir /tmp/testdir
% script.py
yields output like
[14:48:31 Thread-1] <FileCreatedEvent: src_path=/tmp/testdir/.#foo>
[14:48:32 Thread-2] <FileModifiedEvent: src_path=/tmp/testdir/foo>
[14:48:32 Thread-3] <FileModifiedEvent: src_path=/tmp/testdir/foo>
[14:48:32 Thread-4] <FileDeletedEvent: src_path=/tmp/testdir/.#foo>
[14:48:42 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/foo>
[14:48:47 Thread-2] <FileCreatedEvent: src_path=/tmp/testdir/.#bar>
[14:48:49 Thread-4] <FileCreatedEvent: src_path=/tmp/testdir/bar>
[14:48:49 Thread-4] <FileModifiedEvent: src_path=/tmp/testdir/bar>
[14:48:49 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/.#bar>
[14:48:54 Thread-2] <FileDeletedEvent: src_path=/tmp/testdir/bar>
Doug Hellman has written an excellent set of tutorials (which has now been edited into a book) which should help you get started:
I didn't actually end up using a multiprocessing Pool or ThreadPool as discussed in the last two links, but you may find them useful anyway.