Search code examples
pythonobservableobserver-patternrx-py

How to monitor the execution of a series of functions? Maybe RxPy?


I have a list of functions that I want to execute, like in a pipeline.

pipeline = [f1, f2, f3, f4]

(Basically I compose the functions on the list see: https://mathieularose.com/function-composition-in-python/):

def run(pipeline):
     return functools.reduce(lambda f, g: lambda x: f(g(x)), pipeline, lambda x: x)

I want to observe when a function is called, when it finishes, if it fails, etc and log that to a file or a database or to a MongoDB, etc. Each of this functions is returning some python objects, so I want to use the return value and log its attributes, e.g.

If f1 returns a list I want to log f1 was completed at 23:00 on 04/22/2018. It returned a list of length 5 and so on

My problem is not about executing the functions, is about observing the beahaviour of the functions. I want that the functions are agnostic to how I code the pipeline.

I was wondering how to implement the observer pattern here. I know that the "observer pattern" sounds too much "Object Oriented", so my first idea was using decorators, but searching for a guide on this I found RxPy.

So, I am looking for a guide on how to solve this problem.


Solution

  • If you like decorators, you can use the following approach:

    def log_finish_time(f):
        def g(*args):
            ret = f(*args)
            print('{} completed on {}'.format(f, time.time()))
            return ret
        return g
    
    def log_return_value(f):
        def g(*args):
            ret = f(*args)
            print('{} returned {}'.format(f, ret))
            return ret
        return g
    
    @log_finish_time
    @log_return_value
    def f1(x):
        return x+1
    
    @log_finish_time
    @log_return_value
    def f2(x):
        return x*x
    

    without changing your composition code.

    If you want to have arguments to your decorators, you have to add another function in the middle (basically it's a function which returns a decorator):

    def log_finish_time(log_prefix):
        def h(f):
            def g(*args):
                ret = f(*args)
                print('{}: {} completed on {}'.format(log_prefix, f, time.time()))
                return ret
            return g
        return h
    
    @log_finish_time('A')
    def f1(x):
        return x+1