Search code examples
pythondjangodaemonamqpcelery

How can I set up Celery to call a custom initialization function before running my tasks?


I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.

The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.

Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?

Thanks


Solution

  • You can either write a custom loader, or use the signals.

    Loaders have the on_task_init method, which is called when a task is about to be executed, and on_worker_init which is called by the celery+celerybeat main process.

    Using signals is probably the easiest, the signals available are:

    0.8.x:

    • task_prerun(task_id, task, args, kwargs)

      Dispatched when a task is about to be executed by the worker (or locally if using apply/or if CELERY_ALWAYS_EAGER has been set).

    • task_postrun(task_id, task, args, kwargs, retval) Dispatched after a task has been executed in the same conditions as above.

    • task_sent(task_id, task, args, kwargs, eta, taskset)

      Called when a task is applied (not good for long-running operations)

    Additional signals available in 0.9.x (current master branch on github):

    • worker_init()

      Called when celeryd has started (before the task is initialized, so if on a system supporting fork, any memory changes would be copied to the child worker processes).

    • worker_ready()

      Called when celeryd is able to receive tasks.

    • worker_shutdown()

      Called when celeryd is shutting down.

    Here's an example precalculating something the first time a task is run in the process:

    from celery.task import Task
    from celery.registry import tasks
    from celery.signals import task_prerun
    
    _precalc_table = {}
    
    class PowersOfTwo(Task):
    
        def run(self, x):
            if x in _precalc_table:
                return _precalc_table[x]
            else:
                return x ** 2
    tasks.register(PowersOfTwo)
    
    
    def _precalc_numbers(**kwargs):
        if not _precalc_table: # it's empty, so haven't been generated yet
            for i in range(1024):
                _precalc_table[i] = i ** 2
    
    
    # need to use registered instance for sender argument.
    task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
    

    If you want the function to be run for all tasks, just skip the sender argument.