I am writing an ETL framework in Python 3.7 that uses functions as "tasks" bearing a special decorator. Most of these tasks run a loop. If something raises an exception in the loop, I want the function to handle this exception by recording data about the failure, and continuing on with the loop.
This is a simplified example of what I have so far:
class TaskExecutionError(RuntimeError):
def __init__(self, msg="", context={}):
self.msg = msg
self.context = context
def __str__(self):
return self.msg or "Error executing task."
def task(fn):
@wraps(fn)
def _wrapper(*args, **kwargs):
start_ts = datetime.utcnow()
try:
return fn(*args, **kwargs)
except TaskExecutionError as e:
logger.exception(f"Task execution error will be logged: {e}.")
fail_data = {
"task_name": fn.__name__,
"args": list(args),
"kwargs": kwargs,
"context": e.context,
"fail_message": str(e),
"fail_time": str(datetime.utcnow()),
# etc.
}
)
# Write failure data in an object store
finally:
end_ts = datetime.utcnow()
logger.info(f"*** Wallclock: {end_ts - start_ts}.")
_wrapper.is_task = True
return _wrapper
@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
"""
Test handling failures.
"""
for i in range(10):
if i % 3:
raise TaskExecutionError(context={"i": i})
else:
print("All's well")
This works well as far as I see the message being printed and saved, however of course the execution breaks as soon as the first exception is raised.
How shall I tackle this so that execution continues?
It seems like I can't use the very convenient exception mechanism and I probably have to devise a custom handle_failure()
function or so. But I am uncertain about the best way to pass the function decorator's context to the handle failure()
function while I call it from within the decorated function.
Since I am going to use this mechanism in several @task
decorated functions, I'd like to have a lightweight call if possible, without a lot of arguments.
Thanks for any suggestions you may have.
I resolved this using inspect
, which I don't like to use too often, but seemed necessary here:
def task(fn):
@wraps(fn)
def _wrapper(*args, **kwargs):
start_ts = datetime.utcnow()
try:
return fn(*args, **kwargs)
finally:
end_ts = datetime.utcnow()
logger.info(f"*** Wallclock: {end_ts - start_ts}.")
_wrapper.is_task = True
def handle_task_failure(exc, local_ctx={}):
caller_frame = inspect.currentframe().f_back
wrapper_frame = caller_frame.f_back
caller_ctx = wrapper_frame.f_locals
print(f"Context: {caller_ctx}")
logger.exception(f"Task execution error will be logged: {exc}.")
fail_data = {
"start_time": caller_ctx.get("start_ts"),
"runid": caller_ctx.get("runid"),
"task_name": caller_ctx.get("fn").__name__,
"args": list(caller_ctx.get("args")),
"kwargs": caller_ctx.get("kwargs"),
"context": local_ctx,
"fail_message": str(exc),
"fail_time": str(datetime.utcnow()),
"traceback": format_stack(caller_frame),
}
# Save failure data in object store
@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
"""
Test handling failures.
"""
for i in range(10):
try:
if i % 3:
raise RuntimeError("All's borked.")
else:
print("All's well.")
except Exception as exc:
handle_task_failure(exc, {"i": i})
On the other hand, I don't need a custom exception class and the call to handle the failure, which is repeated in several fun is very light-weight.