Search code examples
pythonsignalscore-foundationpyobjcrunloop

How to register a SIGINT handler that will run as soon as Ctrl+C is pressed?


I am writing a Python script that uses the PyObjC bindings for AppKit. The script registers an observer with the shared NSWorkspace's notificationCenter and then calls AppKit.CFRunLoopRun() in order for the notifications to be processed:

from __future__ import print_function
from AppKit import *
import signal

shared_workspace = NSWorkspace.sharedWorkspace()

def on_did_activate_application(notification):
    print('on_did_activate_application(notification = %s)' % notification)

notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
        NSWorkspaceDidActivateApplicationNotification,
        None,
        None,
        on_did_activate_application)

def handle_interrupt(signum, frame):
    notification_center.removeObserver_(did_activate_application_observer)
    CFRunLoopStop(CFRunLoopGetCurrent())

signal.signal(signal.SIGINT, handle_interrupt)

CFRunLoopRun()

The problem I am experiencing (reproducible with the above MCVE) is that, when I press Ctrl+C on the terminal window running the script, handle_interrupt() is not executed immediately, but rather it executes the next time a NSWorkspaceDidActivateApplicationNotification notification is handled.

How do I respond to Ctrl+C / SIGINT just as soon as it occurs?


Solution

  • This can be accomplished by setting a "signal wakeup fd" to the write end of a pipe, and then creating a CFFileDescriptor that monitors the read end of the pipe for activity.

    As mentioned at What’s New in Python 2.6, in Python 2.6, a new API was added to the signal module called set_wakeup_fd(). Whenever a signal is received, a NUL byte ('\0') is written to the fd.

    If the wakeup fd is set to the write end of a pipe, then a CFFileDescriptor can be created to monitor for activity (availability of data) on the read end of the pipe, and callbacks on such activity can be configured to run on the CFRunLoop.

    from __future__ import print_function
    from AppKit import * # For development only. This takes a long time to complete as there are many symbols.
    import fcntl
    import os
    import signal
    
    shared_workspace = NSWorkspace.sharedWorkspace()
    
    def on_did_activate_application(notification):
        print('on_did_activate_application(notification = %s)' % notification)
    
    notification_center = shared_workspace.notificationCenter()
    did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
            NSWorkspaceDidActivateApplicationNotification,
            None,
            None,
            on_did_activate_application)
    
    def handle_signal(signum, frame):
        print('handle_signal(signum = %s, frame = <scrubbed>)' % signum)
        if signum == signal.SIGCONT:
            signal.signal(signal.SIGTSTP, handle_signal)
        elif signum == signal.SIGINT:
            notification_center.removeObserver_(did_activate_application_observer)
            CFRunLoopStop(CFRunLoopGetCurrent())
        else:
            # https://stackoverflow.com/questions/13377773/proper-way-to-handle-signals-other-than-sigint-in-python
            signal.signal(signum, signal.SIG_DFL)
            os.kill(os.getpid(), signum)
    
    r, w = os.pipe()
    
    flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
    fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK)
    
    def callout(f, call_back_types, info):
        # Note: The signal handler will run first.
    
        print('callout()')
    
        # Clear the pipe of NUL bytes.
        n = 0
        while True:
            try:
                n += len(os.read(r, 100))
            except OSError:
                break
        print('read %d byte(s)' % n)
    
        # Per https://developer.apple.com/documentation/corefoundation/cffiledescriptor?language=objc
        # "Each call back is one-shot, and must be re-enabled if you want to get another one."
        # Thus we need to re-enable call backs.
        CFFileDescriptorEnableCallBacks(f, kCFFileDescriptorReadCallBack)
    
    file_descriptor = CFFileDescriptorCreate(None, r, True, callout, None)
    CFFileDescriptorEnableCallBacks(file_descriptor, kCFFileDescriptorReadCallBack)
    run_loop_source = CFFileDescriptorCreateRunLoopSource(None, file_descriptor, 0)
    CFRunLoopAddSource(CFRunLoopGetCurrent(), run_loop_source, kCFRunLoopDefaultMode)
    
    signal.set_wakeup_fd(w)
    signal.signal(signal.SIGCONT, handle_signal)
    signal.signal(signal.SIGINT, handle_signal)
    signal.signal(signal.SIGTSTP, handle_signal)
    
    # For testing, configure a SIGALRM to be received every two seconds.
    signal.signal(signal.SIGALRM, lambda _1, _2: print('SIGALRM'))
    signal.setitimer(signal.ITIMER_REAL, 2, 2)
    
    print('about to call CFRunLoopRun()')
    CFRunLoopRun()
    
    Acknowledgments

    A big thank you to 'A GUEST' (whoever you are) for posting this paste on Pastebin; to Da_Blitz for writing the Fighting set_wakeup_fd article; and to the askers/answerers of “Proper” way to handle signals other than SIGINT in Python?, What's the difference between SIGSTOP and SIGTSTP?, and python Non-block read file on Stack Overflow.