When I run a python script, I can exit the interpreter and atexit
will execute all the functions I have registered.
Now, I am using airflow and would like to trigger atexit
tasks on_kill()
(i.e., when I clear out or kill a dag node in airflow).
For example, in pseudo code, I need to be able to:
class Foo(PythonOperator):
...
def on_kill():
# somehow, trigger atexit tasks without exiting the
# process entirely
atexit
isn't a necessity either--I could do something else. The major point is that something getting executed outside the context of python needs to be killed in a procedural manner, and ideally passing up the kill function by reference to the housing script would be a last resort (python does not make this particular alternative easy).
You could monkey-patch the atexit
module — something like this:
import atexit
from queue import LifoQueue
save_register = atexit.register
atexit_queue = LifoQueue()
def my_register(func, *args, **kwargs):
save_register(func, *args, **kwargs)
atexit_queue.put((func, args, kwargs))
atexit.register = my_register
if __name__ == '__main__':
def func1():
print('func1() called')
def func2(arg):
print(f'func2({arg}) called')
def func3(arg, kwarg1=1, kwarg2='foo'):
print(f'func3({arg}, kwarg1={kwarg1}, kwarg2={kwarg2!r}) called')
atexit.register(func1)
atexit.register(func2, 1)
atexit.register(func3, 2, kwarg1=42, kwarg2='bar')
print('Calling queued atexit functions:\n')
while atexit_queue.qsize():
func, args, kwargs = atexit_queue.get()
atexit.unregister(func) # Prevent it from being called again.
func(*args, **kwargs)
Output:
Calling queued atexit functions:
func3(2, kwarg1=42, kwarg2='bar') called
func2(1) called
func1() called