Search code examples
pythonraspberry-pipyinotify

How to get notified when a partition is mounted or unmounted in RPi OS and Python?


I tried the following code and I am not getting any notification when mounting and unmounting partitions. Am I missing something ?

I use the library pyinotify to watch the changes in the file /proc/mounts... but it seems that my handler is never called...

import pyinotify, time

# Define a callback function to handle file system events
class EventHandler(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self, event):
        print(f"Change detected in {event.pathname}!")

# Create an inotify instance
wm = pyinotify.WatchManager()

# Create an event handler instance
handler = EventHandler()

# Create an inotify notifier and pass the function as the event handler
notifier = pyinotify.ThreadedNotifier(wm, handler)

# Watch for changes to /proc/mounts
wm.add_watch("/proc/mounts", pyinotify.IN_MODIFY)

# Start the notifier loop
print("Watching for mount changes...")
notifier.start()

try:
  while True:
    time.sleep(1)
except: pass

notifier.stop()
print('Done!')

Solution

  • This is how I solved the problem. With this code I get a single notification every time a block device (USB HDD) is connected or disconected, and when a partition is mounted or unmounted. I am not 100% sure that the callback is not triggered on other cases too, but if so, the dbus signal filter can be adjusted...

    import time, threading
    
    class FastTimer(threading.Thread):
      def __init__(self, interval, callback, args=None, kwargs=None, shots=1):
        super().__init__()
        self.daemon = True
        self.interval = interval
        self.callback = callback
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.last_shot = shots
        self.trigger = threading.Event()
        self.done = threading.Event()
        self.terminated = threading.Event()
        self.start()
    
      def Terminate(self):
        if self.is_alive():
          self.terminated.set()
          self.done.set()
          self.trigger.set()
          self.join()
    
      def Gooo(self):
        if self.is_alive():
          self.trigger.set()
    
      def Abort(self):
        if self.is_alive():
          self.done.set()
          self.trigger.clear()
    
      def Reset(self):
        if self.is_alive():
          self.done.set()
    
      def Mark(self):
        if self.is_alive():
          if self.trigger.is_set(): self.done.set()
          else: self.trigger.set()
    
      def run(self):
        while not self.terminated.is_set():
          if not self.trigger.is_set():
            shot = 0; self.done.clear()
          self.trigger.wait()
          Reseted = self.done.wait(self.interval)
          self.done.clear()
          if self.terminated.is_set(): break
          if not Reseted:
            self.callback(*self.args, **self.kwargs)
            if self.last_shot > 0:
              shot += 1
              if shot == self.last_shot:
                self.trigger.clear()
        print('Timer terminated.')
    
    
    class BlockDevMonitor(threading.Thread):
      def __init__(self, callback, args=None, kwargs=None):
        import time
        super().__init__()
        self.loop = None
        self.timer = FastTimer(1, callback, args, kwargs)
        self.start()
        time.sleep(0.3)
    
      def Terminate(self):
        if self.is_alive():
          if self.loop != None: self.loop.quit()
          self.join()
        self.timer.Terminate()
    
      def run(self):
        import dbus
        from dbus.mainloop.glib import DBusGMainLoop
        from gi.repository import GLib
    
        def sigUnitChg(*args):
          if (len(args) >= 1) and ( args[0].startswith('blockdev@') or ('-block-' in args[0]) ): self.timer.Mark()
    
        DBusGMainLoop(set_as_default=True)
        self.loop = GLib.MainLoop()
        bus = dbus.SystemBus()
        bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitNew', path='/org/freedesktop/systemd1')
        bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitRemoved', path='/org/freedesktop/systemd1')
    
        print('DevMonitor started.')
        self.loop.run()
        print('DevMonitor stopped.')
    
    
    #----------------------------------------------
    
    def OnUpdate():
      print('Device Manager updated !')
    
    DevMon = BlockDevMonitor(OnUpdate)
    
    try:
      i = 0
      while True:
        time.sleep(1);
        i += 1; print(i)
    except:
      pass
    
    DevMon.Terminate()