Search code examples
pythoncelerycelery-taskceleryd

Celery KeyError when wrapping app.task function with imported decorator; errors only w/ import


Given the layout:

background \
    tasks  \
        __init__.py
        generic.py
        helpers.py
    __init__.py
    _server.py
    config.py
    router.py
    server.py

And launching _server.py with celery -A background._server worker

I'm given a KeyError: u'generic.adder' in the Worker when trying to call the generic.adder function with a .delay(..)

The adder function:

File generic.py

from background.server import app
from background.tasks.helpers import standardized_task

@standardized_task(app, name='generic.adder')
def adder(x, y):
    return x + y

..is wrapped with a function that takes the app instance and standardizes the input/output of the Celery Task to JSON object that returns the results and the function. (included below) However, the problem is when this wrapper function is in the same file as generic.adder it works flawlessly -- when it's imported and used as above it throws the key error.

I'm led to believe that the wrapper is somehow modifying the name=.. attribute passed to app.task with the function name from helpers.py which is causing the literal name of generic.adder to not be found when accessed from the task.

It's also important to note than if you try and call adder(..) from inside _server.py (the module run from the celery CLI) it works flawlessly; it's only when called through the distributed interface that the error is thrown; meaning, the import works independent of Celery.

File helpers.py

__author__ = 'Blake'

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

def standardized_task(app, *args, **kwargs):
    def wrapped_task(fn):
        def wrapped_fn(*fnargs, **fnkwargs):
            throws = fnkwargs.get('throws', Exception)
            raises = fnkwargs.get('raises', False)

            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))

            result, error = None, None

            try:
                result = fn(*fnargs, **fnkwargs)

                if type(result) not in JSON_TYPES:
                    result = unicode(result)

            except throws, e:
                error = e

                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': fnargs, 'kwargs': fnkwargs
                    }
                }

        return app.task(wrapped_fn, *args, **kwargs)
    return wrapped_task

File _server.py

from background.server import app
from background.tasks.generic import *

Solution

  • The answer is not to use a decorator, but to extend celery.Task into an abstract class and use, @app.task(name='...', base=MyNewAbstractTask)

    The following SO post explains it better:

    celery task and customize decorator

    import types
    
    JSON_TYPES = [
        dict, list, unicode, str, int, long, float, bool, types.NoneType
    ]
    
    class StandardizedTask(Task):
        abstract = True
    
        def __call__(self, *args, **kwargs):
            return self.inner_run(*args, **kwargs)
    
        def inner_run(self, *args, **kwargs):
            throws = kwargs.get('throws', Exception)
            raises = kwargs.get('raises', False)
    
            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))
    
            result, error = None, None
    
            try:
                result = self.run(*args, **kwargs)
    
                if type(result) not in JSON_TYPES:
                    result = unicode(result)
    
            except throws, e:
                error = e
    
                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': args, 'kwargs': kwargs }}