Search code examples
python-3.xexceptionpython-decorators

Handling an exception and continuing a loop


I am writing an ETL framework in Python 3.7 that uses functions as "tasks" bearing a special decorator. Most of these tasks run a loop. If something raises an exception in the loop, I want the function to handle this exception by recording data about the failure, and continuing on with the loop.

This is a simplified example of what I have so far:

class TaskExecutionError(RuntimeError):
    def __init__(self, msg="", context={}):
        self.msg = msg
        self.context = context

    def __str__(self):
        return self.msg or "Error executing task."


def task(fn):
    @wraps(fn)
    def _wrapper(*args, **kwargs):
        start_ts = datetime.utcnow()
        try:
            return fn(*args, **kwargs)

        except TaskExecutionError as e:
            logger.exception(f"Task execution error will be logged: {e}.")
            fail_data = {
                    "task_name": fn.__name__,
                    "args": list(args),
                    "kwargs": kwargs,
                    "context": e.context,
                    "fail_message": str(e),
                    "fail_time": str(datetime.utcnow()),
                    # etc.
                }
            )
            # Write failure data in an object store

        finally:
            end_ts = datetime.utcnow()
            logger.info(f"*** Wallclock: {end_ts - start_ts}.")

    _wrapper.is_task = True
    return _wrapper


@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
    """
    Test handling failures.
    """
    for i in range(10):
        if i % 3:
            raise TaskExecutionError(context={"i": i})
        else:
            print("All's well")

This works well as far as I see the message being printed and saved, however of course the execution breaks as soon as the first exception is raised.

How shall I tackle this so that execution continues?

It seems like I can't use the very convenient exception mechanism and I probably have to devise a custom handle_failure() function or so. But I am uncertain about the best way to pass the function decorator's context to the handle failure() function while I call it from within the decorated function.

Since I am going to use this mechanism in several @task decorated functions, I'd like to have a lightweight call if possible, without a lot of arguments.

Thanks for any suggestions you may have.


Solution

  • I resolved this using inspect, which I don't like to use too often, but seemed necessary here:

    def task(fn):
        @wraps(fn)
        def _wrapper(*args, **kwargs):
            start_ts = datetime.utcnow()
            try:
                return fn(*args, **kwargs)
    
            finally:
                end_ts = datetime.utcnow()
                logger.info(f"*** Wallclock: {end_ts - start_ts}.")
    
        _wrapper.is_task = True
    
    
    def handle_task_failure(exc, local_ctx={}):
        caller_frame = inspect.currentframe().f_back
        wrapper_frame = caller_frame.f_back
        caller_ctx = wrapper_frame.f_locals
        print(f"Context: {caller_ctx}")
        logger.exception(f"Task execution error will be logged: {exc}.")
        fail_data = {
                "start_time": caller_ctx.get("start_ts"),
                "runid": caller_ctx.get("runid"),
                "task_name": caller_ctx.get("fn").__name__,
                "args": list(caller_ctx.get("args")),
                "kwargs": caller_ctx.get("kwargs"),
                "context": local_ctx,
                "fail_message": str(exc),
                "fail_time": str(datetime.utcnow()),
                "traceback": format_stack(caller_frame),
        }
        # Save failure data in object store
    
    @task
    def test_fail_log(a, b, c, kwa=1, kwb=2):
        """
        Test handling failures.
        """
        for i in range(10):
            try:
                if i % 3:
                    raise RuntimeError("All's borked.")
                else:
                    print("All's well.")
            except Exception as exc:
                handle_task_failure(exc, {"i": i})
    
    

    On the other hand, I don't need a custom exception class and the call to handle the failure, which is repeated in several fun is very light-weight.