Search code examples
pythonflaskuwsgipython-watchdog

Flask( using watchdog) and uWSGI - no events from file system


I am using watchdog to reload python modules on run of my Flask server. All works when I run my debug Flask server. But when i start Flask server from uWSGI no notification come into watchdog from my Linux file system, and so modules are not reloaded. MasterService is intialized when first request is accepted.


Note: I have tried to use waitress as well. There everything works fine, but i would rpefer to use uWSGI. Thx for any advice.


'''
Created on 10 Oct 2014

@author: ttrval
'''

import os
import datetime
import pkgutil
import logging
from threading import BoundedSemaphore
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, EVENT_TYPE_MOVED, EVENT_TYPE_MODIFIED, EVENT_TYPE_CREATED, EVENT_TYPE_DELETED


class Context(object):
    '''Holds parameters passed into math services bz ServiceManager
    '''
    logger = None
    serviceManager = None

class Service(object):
    '''Container for python module imported by math_server on run. 
    '''
    __slots__ = 'module', 'modifyDate', "name"
    def __init__(self, name, module, modifyDate):
        self.module = module
        self.modifyDate = modifyDate
        self.name = name

    def update(self, otherService):
        self.module = otherService.module
        self.modifyDate = otherService.modifyDate

    def __repr__(self):
        return "<{typ}|{name}:{module}({date})>".format(
                     typ = type(self),module=self.module, date=self.modifyDate, name=self.name)

    def __str__(self):
        return "Service {name}:{module} was last updated {date}".format(
                     module=self.module, date=self.modifyDate, name=self.name)


class  ServicesFilesEventHandler(FileSystemEventHandler):
    '''Handles changes in file system of services loaded by math_server 
    '''
    def __init__(self, master,logger=logging.getLogger('werkzeug'), supported_types = (".py")):
        self.logger = logger
        self.supported_types = supported_types
        self.master = master

    def dispatch(self, event):
        '''Dispatches events to the appropriate methods.

        :param event:
            The event object representing the file system event.
        :type event:
            :class:`FileSystemEvent`
        '''
        print "event catched{}".format(str(event))
        if event.is_directory:
            return
        path = event.src_path
        if EVENT_TYPE_MOVED is event.event_type:
            path = event.dest_path
        if  path[-3:] in self.supported_types:
            _method_map = {
                EVENT_TYPE_MODIFIED: self.on_modified,
                EVENT_TYPE_MOVED: self.on_moved,
                EVENT_TYPE_CREATED: self.on_created,
                EVENT_TYPE_DELETED: self.on_deleted,
            }
            event_type = event.event_type
            _method_map[event_type](event)

    def on_moved(self, event):
        """Called when a file or a directory is moved or renamed.

        :param event:
            Event representing file/directory movement.
        :type event:
            :class:`DirMovedEvent` or :class:`FileMovedEvent`
        """
        path = event.dest_path
        self.logger.info("File moved: {}".format(path))
        self.master.sync_modify_service(path)
        self.master.sync_modify_service(event.src_path, unload=True)

    def on_created(self, event):
        """Called when a file or directory is created.

        :param event:
            Event representing file/directory creation.
        :type event:
            :class:`DirCreatedEvent` or :class:`FileCreatedEvent`
        """
        path = event.src_path
        logging.getLogger('werkzeug').info("File created: {}".format(path))
        self.master.sync_modify_service(path)

    def on_deleted(self, event):
        """Called when a file or directory is deleted.

        :param event:
            Event representing file/directory deletion.
        :type event:
            :class:`DirDeletedEvent` or :class:`FileDeletedEvent`
        """
        path = event.src_path
        self.logger.info("File deleted: {}".format(path))
        self.master.sync_modify_service(path, unload=True)

    def on_modified(self, event):
        """Called when a file or directory is modified.

        :param event:
            Event representing file/directory modification.
        :type event:
            :class:`DirModifiedEvent` or :class:`FileModifiedEvent`
        """
        path = event.src_path
        self.logger.info("File modified: {}".format(path))
        self.master.semaphore.acquire()
        try:
            self.master.unloadService(path)
            self.master.loadService(path)
        finally:
            self.master.semaphore.release()


class Singleton(type):
    _instances = {}
    def __call__(cls, *args, **kwargs):  # @NoSelf
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class ServicesMaster(object):
    '''Singleton class, provides accesss to Services. It also handles services loading and unloading.
    @uses :class: ServicesFilesEventHandler 
    @uses :package: watcghdog'''
#     __metaclass__ = Singleton
    services=None
    dirname = None
    observer = None
    logger = None
    semaphore = BoundedSemaphore(1)

    def __init__(self, logger=logging.getLogger('werkzeug'), dirname="./services"):
        Context.logger=logger
        Context.serviceManager = self
        self.__class__.dirname=os.path.abspath(dirname)
        self.__class__.logger=logger
        self._closeObserver()
        self.loadServices()
        self._initObserver()

    def __del__(self):
        self.dirname = None
        self._closeObserver()
        del self.services
        del self.observer

    @classmethod
    def _initObserver(cls):
        '''Creates observer of module folder (not recursive)
        '''
        event_handler = ServicesFilesEventHandler(cls, cls.logger)
        print "event_handler init {}".format(str(event_handler))
        if cls.observer is None:
            cls.observer = Observer()
            cls.observer.schedule(event_handler, cls.dirname, recursive=False)
            cls.observer.start()

    @classmethod
    def _closeObserver(cls):
        '''Deactivates observer of module folder (not recursive)'''
        if cls.observer is not None:
            cls.observer.stop()
            cls.observer.join()
            cls.observer = None

    @classmethod      
    def sync_modify_service(cls, path, unload=False):
        '''
        synchronyzed modification of service
        if unload = True: unloads service
        else:loads service
        '''
        cls.semaphore.acquire()
        try:
            if unload:
                cls.unloadService(path)
            else:
                cls.loadService(path)
        finally:
            cls.semaphore.release()

    @classmethod
    def loadServices(cls):
        '''
        Loads service from given path. Consider use of method 
        'sync_modify_service' when only one method ( loadServices xor unloadServices ) cen be executed at one time
        '''
        if cls.services is None:
            cls.services={}
        #remove current directory and replace file systema dress for python dot convention

        importer = pkgutil.ImpImporter(path=cls.dirname)
        cls.semaphore.acquire()
        for name, ispkg in importer.iter_modules():
            if not ispkg:
                loader = importer.find_module(name)
                if '.py'==loader.etc[0]:
                    new_service = Service( 
                            name=name,
                            module = loader.load_module(loader.fullname),
                            modifyDate = cls.modification_date(loader.filename)
                            )
                    cls.services[name]=new_service
                    new_service.module.activate(Context)
        cls.semaphore.release()
        cls.logger.info("Loaded Services: {}".format( cls.services.keys() ))
        print "check after services loaded"

    @classmethod
    def loadService(cls, path):
        fullpath = os.path.abspath(path)
        directory = os.path.dirname(fullpath)
        if directory != cls.dirname:
            raise Exception("Directory '{}' of new service is not module directory('{}')".
                            format(directory, cls.dirname))
        new_service = Service( 
                        name=os.path.basename(fullpath).split('.')[0],
                        module = cls._loadModule(fullpath),
                        modifyDate = cls.modification_date(fullpath)
                        )
        if new_service.name in cls.services: #older version of new service is loaded already
            #deactivate old module instance
            cls.services[new_service.name].module.deactivate(Context)
            #activate new module instance
            cls.services[new_service.name].update(new_service)
        else:
            cls.services[new_service.name] = new_service
        #activate new service
        cls.services[new_service.name].module.activate(Context)
        cls.logger.info( "Loaded Service: {}\nLoaded Services: {}"
                         .format( new_service.name, cls.services.keys() ))

    @classmethod
    def unloadService(cls, path):
        fullpath = os.path.abspath(path)
        directory = os.path.dirname(fullpath)
        #check if file is(was) in directory of services
        if directory != cls.dirname:
            return
        #file is(was) in observed directory of services
        name=os.path.basename(fullpath).split('.')[0]
        if name in cls.services:
            #first deactivate old module
            cls.services[name].module.deactivate(Context)
            #remove old module
            del cls.services[name]
            #remove old module compile
            try:
                os.remove(fullpath.split('.')[0] + ".pyc")
            except Exception:
                #file does note exists already
                cls.logger.info("Found that file {} was removed already.".format( fullpath.split('.')[0] + ".pyc" ))
        else:
            raise KeyError("Service {} not found in loadedServices", name)
        cls.logger.info( "Unloaded Service: {}\nLoaded Services: {}"
                         .format( name, cls.services.keys() ))
        return

    @classmethod
    def _loadModule(cls, path):
        '''
        Loads the single python module from file path
        @param path: path to module f.e:
        @type path: String F.E.:'./services/game_math.py'
        '''
        fullpath = os.path.abspath(path)
        name = os.path.basename(fullpath).split('.')[0] #extracts file name without extension 
        folder = path[:-(len(os.path.basename(path)))]  #extracts path to folder
        importer = pkgutil.ImpImporter(path=folder)
        loader = importer.find_module(name)
        return loader.load_module(loader.fullname)

    @staticmethod
    def runService(name, args):
        '''Returns result from math service for given arguiments
        @raise exception: Exception( "Service '{}' not found on MathServer".format(name) )
        '''
        if name in ServicesMaster.services:
            return ServicesMaster.services[name].module.run(args)
        else:
            raise Exception( "Service '{}' not found on MathServer".format(name) )

    @staticmethod
    def modification_date(filename):
        '''returns modification date of file in datetime'''
        t = os.path.getmtime(filename)
        return datetime.datetime.fromtimestamp(t)

Solution

  • Solution is to enable threads in uwsg configuration and set number of thread to 2 or more