I am writing a Python script that uses the PyObjC bindings for AppKit. The script registers an observer with the shared NSWorkspace
's notificationCenter
and then calls AppKit.CFRunLoopRun() in order for the notifications to be processed:
from __future__ import print_function
from AppKit import *
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_interrupt(signum, frame):
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
signal.signal(signal.SIGINT, handle_interrupt)
CFRunLoopRun()
The problem I am experiencing (reproducible with the above MCVE) is that, when I press Ctrl+C on the terminal window running the script, handle_interrupt() is not executed immediately, but rather it executes the next time a NSWorkspaceDidActivateApplicationNotification
notification is handled.
How do I respond to Ctrl+C / SIGINT just as soon as it occurs?
This can be accomplished by setting a "signal wakeup fd" to the write end of a pipe, and then creating a CFFileDescriptor
that monitors the read end of the pipe for activity.
As mentioned at What’s New in Python 2.6, in Python 2.6, a new API was added to the signal
module called set_wakeup_fd(). Whenever a signal is received, a NUL byte ('\0'
) is written to the fd.
If the wakeup fd is set to the write end of a pipe, then a CFFileDescriptor
can be created to monitor for activity (availability of data) on the read end of the pipe, and callbacks on such activity can be configured to run on the CFRunLoop
.
from __future__ import print_function
from AppKit import * # For development only. This takes a long time to complete as there are many symbols.
import fcntl
import os
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_signal(signum, frame):
print('handle_signal(signum = %s, frame = <scrubbed>)' % signum)
if signum == signal.SIGCONT:
signal.signal(signal.SIGTSTP, handle_signal)
elif signum == signal.SIGINT:
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
else:
# https://stackoverflow.com/questions/13377773/proper-way-to-handle-signals-other-than-sigint-in-python
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def callout(f, call_back_types, info):
# Note: The signal handler will run first.
print('callout()')
# Clear the pipe of NUL bytes.
n = 0
while True:
try:
n += len(os.read(r, 100))
except OSError:
break
print('read %d byte(s)' % n)
# Per https://developer.apple.com/documentation/corefoundation/cffiledescriptor?language=objc
# "Each call back is one-shot, and must be re-enabled if you want to get another one."
# Thus we need to re-enable call backs.
CFFileDescriptorEnableCallBacks(f, kCFFileDescriptorReadCallBack)
file_descriptor = CFFileDescriptorCreate(None, r, True, callout, None)
CFFileDescriptorEnableCallBacks(file_descriptor, kCFFileDescriptorReadCallBack)
run_loop_source = CFFileDescriptorCreateRunLoopSource(None, file_descriptor, 0)
CFRunLoopAddSource(CFRunLoopGetCurrent(), run_loop_source, kCFRunLoopDefaultMode)
signal.set_wakeup_fd(w)
signal.signal(signal.SIGCONT, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTSTP, handle_signal)
# For testing, configure a SIGALRM to be received every two seconds.
signal.signal(signal.SIGALRM, lambda _1, _2: print('SIGALRM'))
signal.setitimer(signal.ITIMER_REAL, 2, 2)
print('about to call CFRunLoopRun()')
CFRunLoopRun()
Acknowledgments
A big thank you to 'A GUEST' (whoever you are) for posting this paste on Pastebin; to Da_Blitz for writing the Fighting set_wakeup_fd article; and to the askers/answerers of “Proper” way to handle signals other than SIGINT in Python?, What's the difference between SIGSTOP and SIGTSTP?, and python Non-block read file on Stack Overflow.