Search code examples
pythoninotifyconcurrent.futures

Python: inotify, concurrent.futures - how to add exisiting files


I've got this simple script that processes files using inotify module and mulit-threading:

import concurrent.futures

import inotify.adapters

def main():
    i = inotify.adapters.Inotify()

    i.add_watch(b'/data')

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        try:
            for event in i.event_gen():
                if event is not None:
                    (header, type_names, watch_path, filename) = event
                    # inotify event: IN_CLOSE_WRITE
                    if header.mask == 8:
                        future = executor.submit(process, filename.decode('utf-8'))
                        future.add_done_callback(future_callback)
        finally:
            i.remove_watch(b'/data')

if __name__ == '__main__':
    main()

The problem I've got is that the watched directory can have many files before the script is actually started.

I thought about something like the example below but this won't start "yielding" inotify generator until all existing files are processed and it will also miss new events created during this time:

import concurrent.futures

import inotify.adapters

def main():
    i = inotify.adapters.Inotify()

    i.add_watch(b'/data')

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        files = os.listdir('/data')
        if files:
            for filename in files:
                future = executor.submit(run, filename)
                future.add_done_callback(future_callback)
        try:
            for event in i.event_gen():
                if event is not None:
                    (header, type_names, watch_path, filename) = event
                    # inotify event: IN_CLOSE_WRITE
                    if header.mask == 8:
                        future = executor.submit(process, filename.decode('utf-8'))
                        future.add_done_callback(future_callback)
        finally:
            i.remove_watch(b'/data')

if __name__ == '__main__':
    main()

Is there a way to manually send inotify event or perhaps add these files to i.event_gen() generator?


Solution

  • Here's an example that processes the old files inside one of the workers allowing for new events to be captured in parallel while processing the old, existing files. For the record, even using your linear code, I had no trouble with missing events.

    Also, the PyInotify module is "defunct and no longer available." according to this inotify module which I used.

    #!/usr/bin/env python3
    
    import concurrent.futures
    import inotify.adapters
    import time
    import os
    from functools import partial
    
    
    DIRECTORY='.'
    
    
    def run(filename, suffix=''):
        time.sleep(1)
        return 'run: ' + filename + suffix
    
    
    def process(filename):
        return run(filename, suffix=' (inotify)')
    
    
    def future_callback(fut):
        print('future_callback: ' + fut.result())
    
    
    def do_directory(executor):
        fn = partial(run, suffix=' (dir list)')
        for filename in os.listdir(DIRECTORY):
            future = executor.submit(fn, filename)
            future.add_done_callback(future_callback)
    
    
    def main():
        i = inotify.adapters.Inotify()
    
        i.add_watch(DIRECTORY.encode())
    
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            # Process the directory in a thread or locally. Not sure if it
            # is safe to submit to the executor from within one its workers.
            # Seems like it should be.
            executor.submit(do_directory, executor)
            # do_directory(executor)
            try:
                for event in i.event_gen():
                    if event is not None:
                        (header, type_names, watch_path, filename) = event
                        # inotify event: IN_CLOSE_WRITE
                        if header.mask == 8:
                            future = executor.submit(process, filename.decode('utf-8'))
                            future.add_done_callback(future_callback)
                            print('Submitted inotify for', filename.decode())
            except KeyboardInterrupt:
                pass
            finally:
                i.remove_watch(DIRECTORY.encode())
    
    
    if __name__ == '__main__':
        main()
    

    Test:

    Start with a directory containing 10 files. Start the program, wait 2 seconds, and then create 5 new files. Look for the "submit" messages to see that the events were received and queued while still processing the initial files and that the new files are eventually handled.

    ~/p/TEST $ touch A1 A2 A3 A4 A5 A6 A7 A8 A9 A10
    ~/p/TEST $ do_test() {
    > rm B*
    > ../inotify-test.py &
    > sleep 2
    > touch B1 B2 B3 B4 B5
    > sleep 5
    > pkill -f inotify-test.py
    > }
    ~/p/TEST $ do_test
    [1] 26663
    future_callback: run: A10 (dir list)
    future_callback: run: A4 (dir list)
    future_callback: run: A5 (dir list)
    future_callback: run: A9 (dir list)
    future_callback: run: A2 (dir list)
    Submitted inotify for B1
    Submitted inotify for B2
    Submitted inotify for B3
    Submitted inotify for B4
    Submitted inotify for B5
    future_callback: run: A3 (dir list)
    future_callback: run: A8 (dir list)
    future_callback: run: A1 (dir list)
    future_callback: run: A7 (dir list)
    future_callback: run: A6 (dir list)
    future_callback: run: B1 (inotify)
    future_callback: run: B2 (inotify)
    future_callback: run: B3 (inotify)
    future_callback: run: B4 (inotify)
    future_callback: run: B5 (inotify)
    ~/p/TEST $ 
    [1]+  Terminated              ../inotify-test.py
    ~/p/TEST $