I use Flower to monitor my Celery tasks.
I'm trying to change the way tasks are displayed (under the Tasks tab) in order for the list to look a bit more "organized". For example, displaying <list (6 items)>
instead of [1, 2, 3, ...
.
Unfortunately, overriding the format_task
method has limitations :
task.args
and task.kwargs
are string representations, often truncated, instead of list/dicttask.name
To get the original args
and kwargs
objects back I'm using eval(task.args)
so that I can iterate through their items afterwards. Evaluating random strings looks a bit unsafe to me, would you recommend a better way than doing so?
I found a solution to get the task's arguments in format_task
as objects (lists/dicts etc.) instead of truncated string representations.
The argsrepr
and kwargsrepr
parameters of the apply_async()
method allows to specify custom representations for the task's arguments.
I created a custom Task class, overriding the delay
method like so:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)>".format(len(arg)))
elif isinstance(arg, dict):
argsrepr.append("<dict ({} keys)>".format(len(arg)))
else:
# Format your args the way you prefer
for key, value in kwargs.items():
# Format your kwargs the same way as above
# if ... :
# kwargsrepr.append(...)
# Create the task
new_task = super().s(*args, **kwargs)
# Use our representations as JSON
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
I then use this class as base for my task using the base
argument :
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
This way, the task's arguments representations are stored as JSON that can be loaded afterwards in Flower's format_task()
and use them as objects instead of strings :
def format_task(task):
argsrepr = json.loads(task.args)
kwargsrepr = json.loads(task.kwargs)
if not argsrepr:
task.args = "( )"
else:
task.args = ', '.join(argsrepr)
if not kwargsrepr:
task.kwargs = "( )"
else:
task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
# [...]
This way, the args are displayed as such:
Args:
<list (3 items)>, <binary content>
Kwargs:
callback = <function>, items = <dict (5 items)>