I am making use of the task_prerun and task_postrun signals to keep track of how many tasks are actually being executed at the time on a specific worker.
Every time a task enters I increase an integer in a file by one. When the task leaves I decrease it a unit.
I write those values to a file. That means I have to consider race conditions when two tasks start at the same time under the same worker and the task_prerrun
signals trigger simultaneously and the same file is accessed.
How would I approach that? Can I have a threading.Lock
object living on the global scope? This lock has to work under a per-worker basis so I guess it would be OK to have it declared globally despite not being very good practice.
I don't want to get the total number of tasks being processed, I want to get the number of tasks being processed by that worker.
The reason why is to protect the instances from being removed when the autoscaling group minimum size changes in an AWS stack... I don't want AWS to kill machines that are still processing tasks.
Consider the following example:
import os
import time
from celery import Celery
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker='pyamqp://guest@localhost/')
# Text file that keeps track of how many tasks are still computing.
counter_file = os.path.join(os.path.dirname(__file__), 'counter.txt')
if not os.path.exists(counter_file):
with open(counter_file, 'w') as f:
f.write('0')
@task_prerun.connect
def before(*args, **kwargs):
""" Open the counter file and increment the value in it. """
with open(counter_file, 'r+') as f:
count = int(f.read())
f.seek(0)
f.write(str(count + 1))
@task_postrun.connect
def after(*args, **kwargs):
""" Open the counter file and decrement the value in it. """
with open(counter_file, 'r+') as f:
count = int(f.read())
f.seek(0)
f.write(str(count - 1))
@app.task
def add(x, y):
time.sleep(5)
return x + y
I considered the solution proposed by @DejanLekic of using the Inspect class and it resulted successful. Here's the final script, which I loaded into celery using 2 machines:
# tasks.py
import os
import random
import socket
import threading
import time
from celery import Celery
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker=os.getenv('BROKER_ADDR', 'pyamqp://guest@localhost//'))
def get_number_of_tasks_being_executed_by_this_worker(wait_before=0.01):
time.sleep(wait_before)
# Do not rely on the worker name, because we are just sure of the hostname, so we
# cannot use the detination= keyword of the inspect call.
active_tasks_by_all_workers = app.control.inspect().active()
# Filter the tasks of the workers on this machine.
active_tasks_by_this_worker = [
val for key, val in active_tasks_by_all_workers.items()
if socket.gethostname() in key
]
# Get the list of tasks of the first (and only, ideally) match.
active_tasks_by_this_worker = active_tasks_by_this_worker[0] if active_tasks_by_this_worker else []
return active_tasks_by_this_worker
def check_if_should_protect_from_autoscaling():
tasks = get_number_of_tasks_being_executed_by_this_worker()
if tasks:
print("%d tasks are still running in this worker. Ensure protection is set." % len(tasks))
# if is_not_protected_against_auto_scaling_group:
# set_aws_autoscaling_protection()
else:
print("This worker is not executing any tasks. Unsetting protection.")
# unset_aws_autoscaling_protection()
@task_postrun.connect
def after(*args, **kwargs):
# Get the number of tasks with a little delay (0.01 seconds suffice), otherwise at
# this point the current task that executed this method is shown as active.
threading.Thread(target=check_if_should_protect_from_autoscaling).start()
@app.task
def add(x, y):
time.sleep(3 * random.random())
return x + y
I am sending many tasks from this script:
# dispatcher.py
import asyncio
from tasks import add
async def task():
add.delay(3, 4)
async def main():
await asyncio.gather(*[task() for i in range(200)])
if __name__ == '__main__':
asyncio.run(main())
And the output logs seem to confirm the expected behaviour:
[2019-09-23 07:50:28,507: WARNING/ForkPoolWorker-3] 10 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,625: WARNING/ForkPoolWorker-1] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,627: WARNING/ForkPoolWorker-7] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,993: WARNING/ForkPoolWorker-4] 7 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,027: INFO/ForkPoolWorker-2] Task tasks.add[c3af9378-5666-42c3-9a37-5d0720b2065a] succeeded in 1.6377690890221857s: 7
[2019-09-23 07:50:29,204: INFO/ForkPoolWorker-9] Task tasks.add[9ca176ce-1590-4670-9947-4656166d224d] succeeded in 2.7913955969852395s: 7
[2019-09-23 07:50:29,224: INFO/ForkPoolWorker-5] Task tasks.add[38d005bc-ff13-4514-aba0-8601e79e67c8] succeeded in 2.0496858750120737s: 7
[2019-09-23 07:50:29,311: WARNING/ForkPoolWorker-8] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,316: WARNING/ForkPoolWorker-6] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,510: WARNING/ForkPoolWorker-10] 4 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,059: WARNING/ForkPoolWorker-2] 3 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,199: INFO/ForkPoolWorker-3] Task tasks.add[991d984a-4434-47a0-8c98-9508ca980f0b] succeeded in 2.7176807850482874s: 7
[2019-09-23 07:50:30,239: WARNING/ForkPoolWorker-9] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,250: WARNING/ForkPoolWorker-5] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:31,226: WARNING/ForkPoolWorker-3] This worker is not executing any tasks. Unsetting protection.
So all good! :D
We have implemented a Celery autoscaling (on AWS) by using few Celery features that come out-of-box. For what you ask we use Celery's control API (https://docs.celeryproject.org/en/latest/reference/celery.app.control.html). The key is the Inspect part of it. The Inspect class can take destination parameter, which is the Celery node you want to inspect. We do not use it, we want to inspect all nodes in the cluster, but perhaps you may need to do it differently. You should get yourself familiar with this class and its .active()
method, which will give you a list of active tasks either in set of workers or in the whole cluster (if destination is not provided).