Search code examples
kubernetescelerycelery-taskcelerybeat

Celery. Running single celery beat + multiple celery workers scale


Having single celery beat running by:

celery -A app:celery beat --loglevel=DEBUG

and three workers running by:

celery -A app:celery worker -E --loglevel=ERROR -n n1
celery -A app:celery worker -E --loglevel=ERROR -n n2
celery -A app:celery worker -E --loglevel=ERROR -n n3

Same Redis DB used as messages broker for all workers and beat. All workers started on same machine for development purposes while they will be deployed using different Kubernetes pods on production. Main idea of usage multiple workers to distribute 50-150 tasks between different Kube pods each running on 4-8 core machine. We expect that none of pod will take more tasks than he have cores until there are any worker exists that has less tasks than available cores so max amount of tasks to be executed concurrently.

So I having troubles to test it locally. Here is local beat triggers three tasks:

[2021-08-23 21:35:32,700: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task-5872-accrual Task5872Accrual() <crontab: 36 21 * * * (m/h/d/dM/MY)>
<ScheduleEntry: task-5872-accrual2 Task5872Accrual2() <crontab: 37 21 * * * (m/h/d/dM/MY)>
<ScheduleEntry: task-5872-accrual3 Task5872Accrual3() <crontab: 38 21 * * * (m/h/d/dM/MY)>
[2021-08-23 21:35:32,700: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2021-08-23 21:35:32,701: DEBUG/MainProcess] beat: Waking up in 27.29 seconds.
[2021-08-23 21:36:00,017: DEBUG/MainProcess] beat: Synchronizing schedule...
[2021-08-23 21:36:00,026: INFO/MainProcess] Scheduler: Sending due task task-5872-accrual (Task5872Accrual)
[2021-08-23 21:36:00,035: DEBUG/MainProcess] Task5872Accrual sent. id->96e671f8-bd07-4c36-a595-b963659bee5c
[2021-08-23 21:36:00,035: DEBUG/MainProcess] beat: Waking up in 59.95 seconds.
[2021-08-23 21:37:00,041: INFO/MainProcess] Scheduler: Sending due task task-5872-accrual2 (Task5872Accrual2)
[2021-08-23 21:37:00,043: DEBUG/MainProcess] Task5872Accrual2 sent. id->532eac4d-1d10-4117-9d7e-16b3f1ae7aee
[2021-08-23 21:37:00,043: DEBUG/MainProcess] beat: Waking up in 59.95 seconds.
[2021-08-23 21:38:00,027: INFO/MainProcess] Scheduler: Sending due task task-5872-accrual3 (Task5872Accrual3)
[2021-08-23 21:38:00,029: DEBUG/MainProcess] Task5872Accrual3 sent. id->68729b64-807d-4e13-8147-0b372ce536af
[2021-08-23 21:38:00,029: DEBUG/MainProcess] beat: Waking up in 5.00 minutes.

I expect that each worker will take single task to optimize load between workers but unfortunately here how they are distributed:

enter image description here

So i am not sure what does different workers synchronized between each other to distribute load between them smoothly? If not can I achieve that somehow? Tried to search in Google but there are mostly about concurrency between tasks in single worker but what to do if I need to run more tasks concurrently than single machine in Kube claster is have?


Solution

  • You should do two things in order to achieve what you want:

    • Run workers with the -O fair option. Example: celery -A app:celery worker -E --loglevel=ERROR -n n1 -O fair
    • Make workers prefetch as little as possible with worker_prefetch_multiplier=1 in your config.