Search code examples
djangocelerycelery-taskcelerybeat

celery task status showing pending


I am working with celery and i am getting tasks status is pending, may be it is implementation problem. please check my code.

I am trying to save task info like id, name, status in my mongodb database, for this i am using a function which my task will call to save data in mongodb.

Am i getting my task pending because my function call is happening before return statement of task?

settings.py

CELERY_BROKER_URL = 'mongodb://localhost:27017/jobs'
CELERY_RESULT_BACKEND = "mongodb"
CELERY_IGNORE_RESULT = False
CELERY_TRACK_STARTED = True
CELERY_MONGODB_BACKEND_SETTINGS = {
"host": "127.0.0.1",
"port": 27017,
"database": "jobs",
"taskmeta_collection": "my_taskmeta_collection",
}
CELERY_BEAT_SCHEDULE = {
    'add-every-minute-contrab': {
        'task': 'username_length_periodically',
        'schedule': crontab(minute='*/1'),
        #'args' : (2,3),
    },
 }

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE

celery.py

from __future__ import absolute_import, unicode_literals
import os, logging
from celery import Celery
from celery.schedules import crontab


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'RestUserAPI.settings')

app = Celery('UserAPI')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py

from __future__ import absolute_import, unicode_literals
from celery import task, current_task, result
from django.conf import settings
import datetime
from .models import TaskMetaData


@task(name='username_length_periodically', bind=True)
def get_username_length_periodically(self):
    last_run = datetime.datetime.now()
    dict = {'name':self.name,
            'id':self.request.id,
            'status':self.AsyncResult(self.request.id).state,
            'last_run': last_run,
            }
    store_metadata(dict)
    return dict


def store_metadata(dict):
    metadata = TaskMetaData()
    metadata.task_id = dict['id']
    metadata.task_name = dict['name']
    metadata.task_status = dict['status']
    metadata.task_last_run = dict['last_run']

    metadata.save()

Solution

  • I think this is just a plain old logic error. If you take a look at your call to check the status of the task using AsyncResult:

    'status':self.AsyncResult(self.request.id).state,
    

    You'll notice that you are checking the status of the task, while the task is running. That means that the task will always show state PENDING (unless you have track_task_started set) when you check the task because you are always checking the status of the task from inside the task and then never go back and update the status!

    In order to update the status of the task, you should kick off a separate monitoring task that periodically checks the status of the task and records it to the database until the tasks is finished or errors out. e.g.,

    @app.task(name='monitor')
    def monitor(task_id):
        result = AsyncResult(task_id)
        if result.state in celery.results.READY_STATES:
            # update metadata table for the task_id
            ...
        else:
            monitor.apply_async(kwargs={ 'task_id': task_id }, countdown=60)