I am using Celery version 4.0.2.
Compared to previous versions of Celery, it seems that class-based tasks are not registered automatically (i.e. if you configured auto-discovery).
However, I do not even achieve registering a class-based task manually.
According to the Celery change log:
http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1
since version 4.0.1 it should be possible to register the task manually:
from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
app.register_task(CustomTask())
But this does not seem to work. Does anyone know how to achieve this?
I tried a few suggestions which are being discussed (apart from integrating a custom task loader mentioned in https://github.com/celery/celery/issues/3744):
Register Celery Class-based Task
Almost there! You need to call delay()
on the task that you registered.
This would work:
from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
task = CustomTask()
app.register_task(task)
task.delay()