Search code examples
fileparallel-processingpython-multithreadinginotify

parallelization of events notification in case of file creation


I am using "Inotify" to logs event when a file or folder is created in a directory ( tmp here) . The example here does the job in as serial process. Meaning, All file creation are treated one after the other, in a sequential way.

import logging

import inotify.adapters

_DEFAULT_LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

_LOGGER = logging.getLogger(__name__)

def _configure_logging():
    _LOGGER.setLevel(logging.DEBUG)

    ch = logging.StreamHandler()

    formatter = logging.Formatter(_DEFAULT_LOG_FORMAT)
    ch.setFormatter(formatter)

    _LOGGER.addHandler(ch)

def _main():
    i = inotify.adapters.Inotify()

    i.add_watch(b'/tmp')

    try:
        for event in i.event_gen():
            if event is not None:
                (header, type_names, watch_path, filename) = event
                _LOGGER.info("WD=(%d) MASK=(%d) COOKIE=(%d) LEN=(%d) MASK->NAMES=%s "
                             "WATCH-PATH=[%s] FILENAME=[%s]",
                             header.wd, header.mask, header.cookie, header.len, type_names,
                             watch_path.decode('utf-8'), filename.decode('utf-8'))
    finally:
        i.remove_watch(b'/tmp')

if __name__ == '__main__':
    _configure_logging()
    _main()

I would like to introduce parallelization of the events notification in case of several files are uploaded by importing threading, should I add a threading as loop ? Second concern, I am not sure where it would make sens to put the thread function.


Solution

  • The below scripts handles multiples events in case of multiples sessions. So in my case, this is enough. I Added the multiprocessing option instead of threading. I found multiprocessing faster than threading.

     import logging
     import threading
     import inotify.adapters
     import multiprocessing  
    
     _DEFAULT_LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    
     _LOGGER = logging.getLogger(__name__)
    
     def _configure_logging():
         _LOGGER.setLevel(logging.DEBUG)
    
         ch = logging.StreamHandler()
    
         formatter = logging.Formatter(_DEFAULT_LOG_FORMAT)
         ch.setFormatter(formatter)
    
         _LOGGER.addHandler(ch)
    
    
    
     def PopUpMessage (event):
         if event is not None:
             (header, type_names, watch_path, filename) = event
             _LOGGER.info("WD=(%d) MASK=(%d) COOKIE=(%d) LEN=(%d) MASK->NAMES=%s "
                 "WATCH-PATH=[%s] FILENAME=[%s]",
                 header.wd, header.mask, header.cookie, header.len, type_names,
                 watch_path.decode('utf-8'), filename.decode('utf-8'))
    
    
     def My_main(count):
         i = inotify.adapters.Inotify()
         DirWatcher=i.add_watch(b'/PARA')
         try:
             while True: 
                 for event in i.event_gen():
                     m = multiprocessing.Process(target=PopUpMessage, args=(event,))
                     m.start()            
    
         finally:
             i.remove_watch(b'/PARA')
    
     if __name__ == '__main__':
         _configure_logging()
         N = multiprocessing.Process(target=My_main)
         N.start()