I tried the following code and I am not getting any notification when mounting and unmounting partitions. Am I missing something ?
I use the library pyinotify
to watch the changes in the file /proc/mounts
... but it seems that my handler is never called...
import pyinotify, time
# Define a callback function to handle file system events
class EventHandler(pyinotify.ProcessEvent):
def process_IN_MODIFY(self, event):
print(f"Change detected in {event.pathname}!")
# Create an inotify instance
wm = pyinotify.WatchManager()
# Create an event handler instance
handler = EventHandler()
# Create an inotify notifier and pass the function as the event handler
notifier = pyinotify.ThreadedNotifier(wm, handler)
# Watch for changes to /proc/mounts
wm.add_watch("/proc/mounts", pyinotify.IN_MODIFY)
# Start the notifier loop
print("Watching for mount changes...")
notifier.start()
try:
while True:
time.sleep(1)
except: pass
notifier.stop()
print('Done!')
This is how I solved the problem. With this code I get a single notification every time a block device (USB HDD) is connected or disconected, and when a partition is mounted or unmounted. I am not 100% sure that the callback is not triggered on other cases too, but if so, the dbus signal filter can be adjusted...
import time, threading
class FastTimer(threading.Thread):
def __init__(self, interval, callback, args=None, kwargs=None, shots=1):
super().__init__()
self.daemon = True
self.interval = interval
self.callback = callback
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.last_shot = shots
self.trigger = threading.Event()
self.done = threading.Event()
self.terminated = threading.Event()
self.start()
def Terminate(self):
if self.is_alive():
self.terminated.set()
self.done.set()
self.trigger.set()
self.join()
def Gooo(self):
if self.is_alive():
self.trigger.set()
def Abort(self):
if self.is_alive():
self.done.set()
self.trigger.clear()
def Reset(self):
if self.is_alive():
self.done.set()
def Mark(self):
if self.is_alive():
if self.trigger.is_set(): self.done.set()
else: self.trigger.set()
def run(self):
while not self.terminated.is_set():
if not self.trigger.is_set():
shot = 0; self.done.clear()
self.trigger.wait()
Reseted = self.done.wait(self.interval)
self.done.clear()
if self.terminated.is_set(): break
if not Reseted:
self.callback(*self.args, **self.kwargs)
if self.last_shot > 0:
shot += 1
if shot == self.last_shot:
self.trigger.clear()
print('Timer terminated.')
class BlockDevMonitor(threading.Thread):
def __init__(self, callback, args=None, kwargs=None):
import time
super().__init__()
self.loop = None
self.timer = FastTimer(1, callback, args, kwargs)
self.start()
time.sleep(0.3)
def Terminate(self):
if self.is_alive():
if self.loop != None: self.loop.quit()
self.join()
self.timer.Terminate()
def run(self):
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
def sigUnitChg(*args):
if (len(args) >= 1) and ( args[0].startswith('blockdev@') or ('-block-' in args[0]) ): self.timer.Mark()
DBusGMainLoop(set_as_default=True)
self.loop = GLib.MainLoop()
bus = dbus.SystemBus()
bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitNew', path='/org/freedesktop/systemd1')
bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitRemoved', path='/org/freedesktop/systemd1')
print('DevMonitor started.')
self.loop.run()
print('DevMonitor stopped.')
#----------------------------------------------
def OnUpdate():
print('Device Manager updated !')
DevMon = BlockDevMonitor(OnUpdate)
try:
i = 0
while True:
time.sleep(1);
i += 1; print(i)
except:
pass
DevMon.Terminate()