Search code examples
djangoredisdjango-celery

Django celery - to check if a task is complete, if complete to restart the task (if not complete, do not restart the task)


I am currently trying to run a periodic check on a scheduled celery task every minute. If the task is still running, to let it continue running and not interrupt it, but if the task has no longer running, to activate the task and start running it. But at the moment, I cannot get the script to run only when it is no longer running. I have tried two methods of doing it but my script does not detect the existing running script and it starts to run even when it shouldn't and my task starts to run simultaneously. I am using celery 4.2.0 and django 1.11.6. Any tips on how I can solve this problem? Thanks

in views.py

Task to run

@task(name='send-one-task')
def send_one_task():

    for i in range(1,100):
        time.sleep(1)
        print ("test1 " + str(i))

    return None

I have tried two methods to to check if process is complete and stopped running - if its not, do not rerun it

method 1

@task(name='send-two-task')
def send_two_task():

    # method 1
    from celery import current_task
    if current_task.request.task != "send-one-task":
        send_one_task()
    else:
        pass
    
    return None

method 2

@task(name='send-two-task')
def send_two_task():

    from celery.task.control import inspect
    insp = inspect()
    testactive = insp.active()
    checkrunning = list(testactive.values())

    try:
        #the script is still running - so ok.
        print (checkrunning[0][1].get("name"))
        print ("task still running - ok")
        pass

    except:
        #the task has failed so need to restart
        print ("task not running - restart task")
        send_one_task()
        pass

    return None

To schedule the check on the task to get it running every 60 seconds if it is not already

In celery.py

'send_second_task': {
    'task': 'send-two-task',
    'schedule': 60.0,
},

Solution

  • To do a celery task to check whether another celery task is still running and if it is not running to start the task can be done by the following script.

    @task(name='send-two-task')
    def send_two_task():
    
        from celery.task.control import inspect
        insp = inspect()
        testactive = insp.active()
        checkrunning = list(testactive.values())
    
        try:
            # this test will fail and exception will result if send_one_task already is not running
            test = checkrunning[0][1].get("name")
            print ("task still running - ok")
            pass
    
        except IndexError:
    
            #the task has failed so need to restart
            print ("task not running - restart task")
            send_one_task()
            pass
    
        return None
    

    The script basically checks how many scripts it is running. And if it is running two tasks(including checkrunning[0][0] as well as checkrunning[0][1]), it should leave it alone, but if it is only running one script (only checkrunning[0][0]) then it should restart the first script(send_one_task).

    The confusing part is that even though send_one_task should represented by checkrunning[0][1].get('name'), it displays send_two_tasks name for both for checkrunning[0][0].get('name') and checkrunning[0][1].get('name') when both send_one_task and send_two_task are running. I think this is because the send_one_task/function that is running is counted as part of the send_two_task celery task and therefore included in the dictionary as a second task twice.

    Let me know if anyone has better answers or if I can make edits to better my answer.