I've got this simple script that processes files using inotify module and mulit-threading:
import concurrent.futures
import inotify.adapters
def main():
i = inotify.adapters.Inotify()
i.add_watch(b'/data')
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
finally:
i.remove_watch(b'/data')
if __name__ == '__main__':
main()
The problem I've got is that the watched directory can have many files before the script is actually started.
I thought about something like the example below but this won't start "yielding" inotify generator until all existing files are processed and it will also miss new events created during this time:
import concurrent.futures
import inotify.adapters
def main():
i = inotify.adapters.Inotify()
i.add_watch(b'/data')
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
files = os.listdir('/data')
if files:
for filename in files:
future = executor.submit(run, filename)
future.add_done_callback(future_callback)
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
finally:
i.remove_watch(b'/data')
if __name__ == '__main__':
main()
Is there a way to manually send inotify
event or perhaps add these files to i.event_gen()
generator?
Here's an example that processes the old files inside one of the workers allowing for new events to be captured in parallel while processing the old, existing files. For the record, even using your linear code, I had no trouble with missing events.
Also, the PyInotify module is "defunct and no longer available." according to this inotify module which I used.
#!/usr/bin/env python3
import concurrent.futures
import inotify.adapters
import time
import os
from functools import partial
DIRECTORY='.'
def run(filename, suffix=''):
time.sleep(1)
return 'run: ' + filename + suffix
def process(filename):
return run(filename, suffix=' (inotify)')
def future_callback(fut):
print('future_callback: ' + fut.result())
def do_directory(executor):
fn = partial(run, suffix=' (dir list)')
for filename in os.listdir(DIRECTORY):
future = executor.submit(fn, filename)
future.add_done_callback(future_callback)
def main():
i = inotify.adapters.Inotify()
i.add_watch(DIRECTORY.encode())
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Process the directory in a thread or locally. Not sure if it
# is safe to submit to the executor from within one its workers.
# Seems like it should be.
executor.submit(do_directory, executor)
# do_directory(executor)
try:
for event in i.event_gen():
if event is not None:
(header, type_names, watch_path, filename) = event
# inotify event: IN_CLOSE_WRITE
if header.mask == 8:
future = executor.submit(process, filename.decode('utf-8'))
future.add_done_callback(future_callback)
print('Submitted inotify for', filename.decode())
except KeyboardInterrupt:
pass
finally:
i.remove_watch(DIRECTORY.encode())
if __name__ == '__main__':
main()
Test:
Start with a directory containing 10 files. Start the program, wait 2 seconds, and then create 5 new files. Look for the "submit" messages to see that the events were received and queued while still processing the initial files and that the new files are eventually handled.
~/p/TEST $ touch A1 A2 A3 A4 A5 A6 A7 A8 A9 A10
~/p/TEST $ do_test() {
> rm B*
> ../inotify-test.py &
> sleep 2
> touch B1 B2 B3 B4 B5
> sleep 5
> pkill -f inotify-test.py
> }
~/p/TEST $ do_test
[1] 26663
future_callback: run: A10 (dir list)
future_callback: run: A4 (dir list)
future_callback: run: A5 (dir list)
future_callback: run: A9 (dir list)
future_callback: run: A2 (dir list)
Submitted inotify for B1
Submitted inotify for B2
Submitted inotify for B3
Submitted inotify for B4
Submitted inotify for B5
future_callback: run: A3 (dir list)
future_callback: run: A8 (dir list)
future_callback: run: A1 (dir list)
future_callback: run: A7 (dir list)
future_callback: run: A6 (dir list)
future_callback: run: B1 (inotify)
future_callback: run: B2 (inotify)
future_callback: run: B3 (inotify)
future_callback: run: B4 (inotify)
future_callback: run: B5 (inotify)
~/p/TEST $
[1]+ Terminated ../inotify-test.py
~/p/TEST $