I have a server where I installed a RabbitMQ broker and two Celery consumers (main1.py and main2.py) both connected to the same broker.
In the first consumer (main1.py), I implemented a Celery Beat that sends multiple times a different task on a specific queue:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.beat', {'queue': 'print-queue'}),
],
)
app.conf.beat_schedule = {
'beat-every-10-seconds': {
'task': 'tasks.beat',
'schedule': 10.0
},
}
@app.task(name='tasks.beat', bind=True)
def beat(self):
for i in range(10):
app.send_task("tasks.print", args=[i], queue="print-queue")
return None
In the second consumer (main2.py), I implemented the task said above:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.print', {'queue': 'print-queue'}),
],
)
@app.task(name='tasks.print', bind=True)
def print(self, name):
return name
When I start the two Celery worker:
consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue
I get these errors:
[ERROR/MainProcess] Received unregistered task of type 'tasks.print'
on the first consumer
[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'
on the second consumer
Is it possible to split tasks on different Celery Applications both connected to the same broker?
Thanks in advance!
Here's what is happening. You have two workers A
and B
one of which also happens to be running celery beat (say that one is B
).
task.beat
to the queue. All this does is enqueue a message in rabbit with some metadata including the name of the task. one of the two workers reads the message. Both A and B are listening to the same queue so either may read it.
a. If A reads the message it will try to find the task called tasks.beat
this blows up because A doesn't define that task.
b. If B reads the message it will successfully try to find the task called tasks.beat
(since it does have that task) and will run the code. tasks.beat
will enqueue a new message in rabbit containing the metadata for tasks.print
.
tasks.print
but either may get the message.In practice, celery may be doing some checks to throw an error message earlier but I'm fairly certain this is the underlying problem.
In short, all workers (including beat) on a queue should be running the same code.