The docs say that custom tasks are instantiated only once, and that it is a good way to cache needed data like database connections for all task invocations. But it seems it occurs both on the worker, and the caller. A task MCVE:
# tasks.py
from celery import Celery, Task
from time import sleep
celery = Celery(
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
class PatternTask(Task):
def __init__(self):
print("Initialising task")
sleep(10)
self._pattern = "Hello, %s!"
print("Initialised task")
@property
def pattern(self):
return self._pattern
@celery.task(base=PatternTask)
def hello(who):
sleep(2)
return hello.pattern % who
and calling code:
# main.py
from tasks import hello
print(hello.delay("world").get())
print(hello.delay("you").get())
This will delay both the worker and the calling code by 10 seconds:
$ python main.py
Initialising task
# <10 seconds>
Initialised task
# <2 seconds>
Hello, world!
# <2 seconds>
Hello, you!
I understand this is necessary to support the case of hello("now")
, where worker is not involved. However, is there a way to promise I will never do so, and avoid the expensive sleep
operation and the huge waste of time, memory and CPU resources† in allocating _pattern
in the calling code? If not, what is the recommended solution for this scenario?
†) The actual use case is loading gigabytes of data needed for worker's operation, that the calling code will have no use for.
The solution was: Celery signals.
# tasks.py
from celery import Celery, Task, signals
from time import sleep
celery = Celery(
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
class PatternTask(Task):
def __init__(self):
super().__init__()
signals.worker_init.connect(self.on_worker_init)
def on_worker_init(self, *args, **kwargs):
print("Initialising task")
sleep(10)
self._pattern = "Hello, %s!"
print("Initialised task")
@property
def pattern(self):
return self._pattern
@celery.task(bind=True, base=PatternTask)
def hello(self, who):
print(f"In hello {who}")
sleep(2)
print(f"Done hello {who}")
return self.pattern % who
The worker will apparently not start accepting jobs until the celery.signals.worker_init
handlers are all done. (I also used bind=True
to make it more maintainable; this is not relevant to the solution.)