Search code examples
pythoncelery

Dynamic registration of celery tasks


I would like to which is the best way to use celery with tasks registered at runtime. My workfflow is as follows:

  • Start celery app
  • Start python app
  • The python app creates a new task that I would like to schedule in celery

Solution

  • The way I have accomplished is the based on the "plugin" concept based on the same idea that the click package has with custom subcommands has.

    The app structure (based on python 3):

    .
    ├── dynamic_tasks.py
    ├── run.py
    └── tasks
        └── get_rate.py
    

    The celery task dynamic_tasks.py is defined as following:

    import os
    import celery
    
    app = celery.Celery('dynamic_tasks', broker='amqp://guest@192.168.169.1/', backend='rpc://')
    
    PLUGIN_FOLDER = os.path.join(os.path.dirname(__file__), 'tasks')
    def _absolutepath(filename):
        """ Return the absolute path to the filename"""
        return os.path.join(PLUGIN_FOLDER, filename)
    
    @app.task
    def tasks(funcname, *args, **kwargs):
        try:
            funcname = funcname.replace('-', '_')
            funcname += '.py'
            func = _absolutepath(funcname)
            ns = {}
            with open(func) as f:
                code = compile(f.read(), func, 'exec')
                eval(code, ns, ns)
            return ns['task'](*args, **kwargs)
        except IOError as e:
           # Manage IOError
           raise e
    

    The plugable task example tasks/get_rate.py:

    """ This task get the currency rate between a pair of currencies """    
    import urllib.request
    
    URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'
    
    def task(pair='EURSEK', url_tmplt=URL):
        with urllib.request.urlopen(url_tmplt.format(pair)) as res:
            body = res.read()
        return (pair, float(body.strip()))
    

    And, simply, to run the example from run.py:

    from dynamic_tasks import tasks
    
    print(tasks.delay('get_rate', 'EURSEK').get())
    

    EDITED Since celery runs on differents machine it is not possible to rely on the local filesystem. My new approach is to send the function to execute as string:

    @app.task
    def dynamic_tasks(funcname, funccode, *args, **kwargs):
        try:
            ns = {}
            code = compile(funccode, funcname, 'exec')
            eval(code, ns, ns)
            logger.info('execute %r with args %r, %r', funcname, args, kwargs)
            return ns['task'](*args, **kwargs)
        except IOError:
            logger.error("Error loading the dynamic function from text %s", funcname)