Search code examples
pythontaskcelerydisplayflower

Advanced task formatting in Flower (Celery monitoring)


I use Flower to monitor my Celery tasks.

I'm trying to change the way tasks are displayed (under the Tasks tab) in order for the list to look a bit more "organized". For example, displaying <list (6 items)> instead of [1, 2, 3, ....

Unfortunately, overriding the format_task method has limitations :

  • task.args and task.kwargs are string representations, often truncated, instead of list/dict
  • HTML is escaped for every field except task.name
  • if the function doesn't return a value, an AJAX error is thrown when displaying the tasks

To get the original args and kwargs objects back I'm using eval(task.args) so that I can iterate through their items afterwards. Evaluating random strings looks a bit unsafe to me, would you recommend a better way than doing so?


Solution

  • I found a solution to get the task's arguments in format_task as objects (lists/dicts etc.) instead of truncated string representations.

    The argsrepr and kwargsrepr parameters of the apply_async() method allows to specify custom representations for the task's arguments.

    I created a custom Task class, overriding the delay method like so:

    import json
    from celery import Task
    
    class FlowerTask(Task):
        def delay(self, *args, **kwargs):
            argsrepr, kwargsrepr = [], {}
    
            for arg in args:
                if isinstance(arg, bytes):
                    argsrepr.append("<binary content>")
                elif isinstance(arg, list):
                    argsrepr.append("<list ({} items)>".format(len(arg)))
                elif isinstance(arg, dict):
                    argsrepr.append("<dict ({} keys)>".format(len(arg)))
                else:
                    # Format your args the way you prefer
                
            for key, value in kwargs.items():
                # Format your kwargs the same way as above
                # if ... :
                # kwargsrepr.append(...)
            
            # Create the task
            new_task = super().s(*args, **kwargs)
    
            # Use our representations as JSON
            return new_task.apply_async(
                argsrepr=json.dumps(argsrepr),
                kwargsrepr=json.dumps(kwargsrepr)
            )
    

    I then use this class as base for my task using the base argument :

    @shared_task(base=FlowerTask)
    def test_task(*args, **kwargs):
        return "OK !"
    

    This way, the task's arguments representations are stored as JSON that can be loaded afterwards in Flower's format_task() and use them as objects instead of strings :

    def format_task(task):
        argsrepr = json.loads(task.args)
        kwargsrepr = json.loads(task.kwargs)
    
        if not argsrepr:
            task.args = "( )"
        else:
            task.args = ', '.join(argsrepr)
    
        if not kwargsrepr:
            task.kwargs = "( )"
        else:
            task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
        # [...]
    

    This way, the args are displayed as such:

    Args: <list (3 items)>, <binary content>

    Kwargs: callback = <function>, items = <dict (5 items)>