I'm learning Python(2.7)/Django(1.5) these days via developing my own Reddit clone (on ubuntu 14.04 LTS). I'm trying to incorporate Celery(3.1) with Redis into the mix, using it to periodically run a ranking algo as a task (on my local set up). But unfortunately, I can't get this simple task to execute even once! Can you help me spot what I'm doing incorrectly?
Here's my directory structure:
-unconnectedreddit (manage.py is here)
-links (tasks.py, models.py, views.py, admin.py)
-unconnectedreddit (celery.py, __init.py___, settings.py, urls.py)
-static
-templates
Celery.py:
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'unconnectedreddit.settings')
app = Celery('unconnectedreddit', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0',include=['unconnectedreddit.links.tasks'])
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
Additions to settings.py are as follows. Note that I did run migrate
after adding 'djcelery' to installed apps:
INSTALLED_APPS = ('djcelery',)
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://localhost:6379/0'
CELERY_IMPORTS = ('links.tasks', )
CELERY_ALWAYS_EAGER = False
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IGNORE_RESULT=True
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'rank_all-every-30-seconds': {
'task': 'tasks.rank_all',
'schedule': timedelta(seconds=30),
},
}
CELERY_TIMEZONE = 'UTC'
__init__.py:
from __future__ import absolute_import
from .celery import app as celery_app1
tasks.py:
import os
from unconnectedreddit import celery_app1
import time
from links.models import Link
@celery_app1.task
def rank_all():
for link in Link.with_votes.all():
link.set_rank() #ranks interesting 'links' submitted by users
I'm running this command on the Terminal to start a worker: celery -A unconnectedreddit worker -l info
The output I get is as follows:
-------------- celery@has-VirtualBox v3.1.18 (Cipater)
---- **** -----
--- * *** * -- Linux-3.16.0-30-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: unconnectedreddit:0x7f938b838910
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. links.tasks.rank_all
[2015-06-04 12:01:17,083: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-06-04 12:01:17,098: INFO/MainProcess] mingle: searching for neighbors
[2015-06-04 12:01:18,107: INFO/MainProcess] mingle: all alone
/home/has/.virtualenvs/unconnectedreddit/local/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2015-06-04 12:01:18,136: WARNING/MainProcess] /home/has/.virtualenvs/unconnectedreddit/local/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2015-06-04 12:01:18,137: WARNING/MainProcess] celery@has-VirtualBox ready.
That's it. I had set out to periodically run this task after every 30 seconds (see my CELERYBEAT_SCHEDULE). But my code doesn't result in it executing even once - rankings on my reddit clone don't change at all. Can any expert point out what I'm missing in this set up?
Wrong task name. This got fixed by changing the task decorator to @celery_app1.task(name='tasks.rank_all')
and tweaking my beat schedule to include the correct name:
CELERYBEAT_SCHEDULE = {
'tasks.rank_all': {
'task': 'tasks.rank_all',
'schedule': timedelta(seconds=30),
},
}