Given the layout:
background \
tasks \
__init__.py
generic.py
helpers.py
__init__.py
_server.py
config.py
router.py
server.py
And launching _server.py
with celery -A background._server worker
I'm given a KeyError: u'generic.adder'
in the Worker when trying to call the generic.adder
function with a .delay(..)
The adder function:
File generic.py
from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
return x + y
..is wrapped with a function that takes the app
instance and standardizes the input/output of the Celery Task to JSON object that returns the results and the function. (included below) However, the problem is when this wrapper function is in the same file as generic.adder it works flawlessly -- when it's imported and used as above it throws the key error.
I'm led to believe that the wrapper is somehow modifying the name=..
attribute passed to app.task
with the function name from helpers.py
which is causing the literal name of generic.adder
to not be found when accessed from the task.
It's also important to note than if you try and call adder(..)
from inside _server.py
(the module run from the celery CLI) it works flawlessly; it's only when called through the distributed interface that the error is thrown; meaning, the import works independent of Celery.
File helpers.py
__author__ = 'Blake'
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
def wrapped_task(fn):
def wrapped_fn(*fnargs, **fnkwargs):
throws = fnkwargs.get('throws', Exception)
raises = fnkwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = fn(*fnargs, **fnkwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': fnargs, 'kwargs': fnkwargs
}
}
return app.task(wrapped_fn, *args, **kwargs)
return wrapped_task
File _server.py
from background.server import app
from background.tasks.generic import *
The answer is not to use a decorator, but to extend celery.Task into an abstract class and use, @app.task(name='...', base=MyNewAbstractTask)
The following SO post explains it better:
celery task and customize decorator
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
class StandardizedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
return self.inner_run(*args, **kwargs)
def inner_run(self, *args, **kwargs):
throws = kwargs.get('throws', Exception)
raises = kwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = self.run(*args, **kwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': args, 'kwargs': kwargs }}