Search code examples
pythonrediscelerycelery-task

Query task state - Celery & redis


Okay so I have a relatively simple problem I think, and it's like I'm hitting a brick wall with it. I have a flask app, and a webpage that allows you to run a number of scripts on the server side using celery & redis(broker).

All I want to do, is when I start a task to give it a name/id (task will be portrayed as a button on the client side) i.e.

@app.route('/start_upgrade/<task_name>')
def start_upgrade(task_name):
    example_task.delay(1, 2, task_name=task_name)

Then after the task has kicked off I want to see if the task is running/waiting/finished in a seperate request, preferably like;

@app.route('/check_upgrade_status/<task_name>')
def get_task_status(task_name):
    task = celery.get_task_by_name(task_name)  
    task_state = task.state
    return task_state # pseudocode

But I can't find anything like that in the docs. I am very new to celery though just FYI so assume I know nothing. Also just to be extra obvious, I need to be able to query the task state from python, no CLI commands please.

Any alternative methods of achieving my goal of querying the queue are also welcome.


Solution

  • I ended up figuring out a solution for my question from arthur's post.

    In conjunction with redis I created these functions

    import redis
    from celery.result import AsyncResult
    
    redis_cache = redis.StrictRedis(host='localhost', port=6379, db=0)
    
    def check_task_status(task_name):
        task_id = redis_cache.get(task_name)
        return AsyncResult(task_id).status
    
    def start_task(task, task_name, *args, **kwargs):
        response = task.delay(*args, **kwargs)
        redis_cache.set(task_name, response.id)
    

    Which allows me to define specific names to tasks. Note I haven't actually tested this yet but it makes sense so.

    Example usage;

    start_task(example_task, "example_name", 1, 2)