Search code examples
pythondjangorediscelerycelery-task

celery schedule not work and got KeyError using Django


Env: django 1.6.5 python 2.7 celery 3.1.11

My target server to run this project not install djcelery. So, I am not trying to use djcelery. I am follow DOCSFirst steps with Django AND Using Celery in your Application. When I run celery -A djproj -B -l debug I got KeyError. And There is actually no the target task in [tasks]. Anyone knows how to solve it? Thanks.

Error

[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . djproj.celery.debug_task
  . djproj.celery.defaulttask1
  . djproj.celery.hello

... ...

[2014-06-19 16:08:23,007: INFO/MainProcess] Received task: djproj.celery.hello[794e54c7-62c8-4ad8-bcbb-64ff809366d1]
[2014-06-19 16:08:23,008: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x1036d17d0> (args:('djproj.celery.hello', '794e54c7-62c8-4ad8-bcbb-64ff809366d1', [], {}, {'utc': True, u'is_eager': False, 'chord': None, u'group': None, 'args': [], 'retries': 0, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'}, 'expires': None, u'hostname': 'celery@nluckys-Mac.local', 'task': 'djproj.celery.hello', 'callbacks': None, u'correlation_id': u'794e54c7-62c8-4ad8-bcbb-64ff809366d1', 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, u'reply_to': u'20fe5513-efc2-3e69-9541-8e82758f94f9', 'id': '794e54c7-62c8-4ad8-bcbb-64ff809366d1', u'headers': {}}) kwargs:{})
[2014-06-19 16:08:23,010: WARNING/Worker-2] helloincelery
[2014-06-19 16:08:23,010: ERROR/MainProcess] Received unregistered task of type 'apps.app1.tuan.tuantask1'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'apps.app1.tuan.tuantask1', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'f2f92c1d-ac6d-4131-a029-123fbfc4ab48'} (220b)
Traceback (most recent call last):
  File "/Library/Python/2.7/site-packages/celery/worker/consumer.py", line 455, in on_task_received
    strategies[name](message, body,
KeyError: 'apps.app1.tuan.tuantask1'

Shedule seems not working. My full codes are here. Some of codes are pasted as below.

My project folders:

djproj
├── apps
│   ├── __init__.py
│   └──  app1
│       ├── __init__.py
│       ├── admin.py
│       ├── models.py
│       ├── tests.py
│       ├── tuan.py
│       └── views.py
├── djproj
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── files
├── manage.py
└── run.sh

djproj/djproj/__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

djproj/djproj/celery.py

from __future__ import absolute_import

import os
import datetime
from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djproj.settings')

app = Celery('djproj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

@app.task
def defaulttask1():
    currt = datetime.datetime.now()
    with open("files/defaulttask1.txt", "w") as fo:
        print >> fo, currt.isoformat()+"default_task_1"
        return currt 

@app.task
def hello():
    print "helloincelery" 

djproj/djproj/settings.py

"""
Django settings for djproj project.

For more information on this file, see
https://docs.djangoproject.com/en/1.6/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.6/ref/settings/
"""
from __future__ import absolute_import
from celery.schedules import crontab

BROKER_URL = 'redis://localhost:6379/0'

from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    #'defaulttask1': {
    #    'task': 'djproj.celery.defaulttask1',
    #    'schedule': timedelta(seconds=3)
    #}, 
    'hello': {
        'task': 'djproj.celery.hello',
        'schedule': timedelta(seconds=4)
    }, 
    'tuantask1': {
        'task': 'apps.app1.tuan.tuantask1',
        'schedule': timedelta(seconds=6)
    },  
    #'tuantask2': {
    #    'task': 'app1.tuan.tuantask2',
    #    'schedule': crontab(minute=55, hour=17)
    #}
}

TIME_ZONE = 'Asia/Shanghai'

DATETIME_FORMAT = 'Y-m-d H:i:s'

TIME_FORMAT = 'Y-m-d H:i:s'

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.6/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '+df53@zaea16*pa%)kyta=ciam#$1c1&tjx-5!59f+nlo)n#!4'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

TEMPLATE_DEBUG = True

ALLOWED_HOSTS = []


# Application definition

INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'apps.app1',
)

MIDDLEWARE_CLASSES = (
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
)

ROOT_URLCONF = 'djproj.urls'

WSGI_APPLICATION = 'djproj.wsgi.application'


# Database
# https://docs.djangoproject.com/en/1.6/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
    }
}

# Internationalization
# https://docs.djangoproject.com/en/1.6/topics/i18n/

LANGUAGE_CODE = 'zh_CN'

#TIME_ZONE = 'UTC'

USE_I18N = True

USE_L10N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.6/howto/static-files/

STATIC_URL = '/static/'

djproj/apps/app1/tuan.py

from __future__ import absolute_import

#from djproj.celery import app
from celery import shared_task
import datetime

#@shared_task
#def add(x, y):
#    return x + y

#@shared_task
#def mul(x, y):
#    return x * y

#@shared_task
#def xsum(numbers):
#    return sum(numbers)

@shared_task
def tuantask1():
    currt = datetime.datetime.now()
    with open("files/tuantask1.txt", "w") as fo:
        print >> fo, currt.isoformat()+"tuantask_1"
    return currt.isoformat()

@shared_task
def tuantask2():
    print "stest===="

djproj/run.sh

celery -A djproj worker -B -l debug

Solution

  • Well, I know what happen. I need modify tuan.py to tasks.py, since autodiscover_tasks in celery.py only can find tasks in apps in file named tasks.py. If do not modify file name, just import tasks manually using CELERY_IMPORTS in settings.py using absolute path (e.g. 'apps.app1.tuan' ) works either. see DOC

    a common practice for reusable apps is to define all tasks in a separate 
    tasks.py module, and Celery does have a way to autodiscover these modules
    

    |

    This way you do not have to manually add the individual modules to the 
    CELERY_IMPORTS setting.