Search code examples
pythondjangocelerycelerybeat

Limit a specific task in Celery


I have a Django function my_func() that parallelizes it's execution in threads. The function is launched it a scheduled job. In the server, the threads are executed in Celery.

The server celery configuration file, conf, is set tu use 8 nodes * 8 threads each.

CELERYD_NODES=4
CELERYD_OPTS="--concurrency=8"

I want to restrict the function my_func to use 1 node and 4 threads maximum.

For that, I added this lines to the file.

[task:ad.tasks.my_func]
--concurrency=1
--max-threads=4

But it doesn't work. The function still launches all the workers and threads available (32 parallel processes).

How must specific task restrictions be configured?


Solution

  • Celery does not provide task-specific settings for concurrency directly in the settings file. The concurrency setting in Celery is applied at the worker level, not at the task level, which means all tasks executed by a worker will share the same concurrency setting.

    For the case you described, you have a couple of options:

    1. Separate Worker: Create a separate worker that only handles the my_func tasks and set the concurrency for that worker to 4. You would start this worker with a command like:

    celery -A your_project_name worker --concurrency=4 -Q my_func_queue -n worker1.%h
    

    In your task definition, you need to specify the queue for this task:

    @app.task(queue='my_func_queue')
    def my_func():
        # Your function code here...
    

    2. Task Rate Limiting: If you want to limit the rate at which tasks are executed, you can use the rate_limit option:

    @app.task(rate_limit='4/m')
    def my_func():
        # Your function code here...
    

    This would limit the task to executing 4 times per minute.

    3. Task Semaphores: Another way to limit the concurrency of a specific task is to use a semaphore within the task itself. This can be achieved using Python's built-in threading library. This method has the advantage of allowing you to control the concurrency of the task, regardless of which worker it is executed on:

    import threading
    
    semaphore = threading.Semaphore(4)
    
    @app.task
    def my_func():
        with semaphore:
            # Your function code here...
    

    This will limit the number of simultaneous executions of my_func to 4, no matter how many workers or threads are available.

    Remember, Celery operates based on distributed message passing and is designed to execute tasks asynchronously in a distributed manner. If you find yourself often needing to limit tasks to certain threads or workers, it may be worth considering whether a different design pattern would better suit your needs.