Is there a way to access the callable Python object from a Celery task, either directly or through a lookup method (like Django's `get_model()). I've got a dot-notation reference to where the callable is declared but I need the actual object to get access to it's attributes before sending / calling the task.
I've scoured through the public and private methods but none of them seem obviously what I'm looking for:
$ dir(celery_task_instance)
> ['AsyncResult', 'MaxRetriesExceededError', 'OperationalError', 'Strategy',
'__bound__', '__call__', '__class__', '__delattr__', '__dict__', '__dir__',
'__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__',
'__hash__', '__header__', '__init__', '__le__', '__lt__', '__module__',
'__name__', '__ne__', '__new__', '__qualname__', '__reduce__',
'__reduce_ex__', '__repr__', '__self__', '__setattr__', '__sizeof__',
'__str__', '__subclasshook__', '__trace__', '__v2_compat__', '__weakref__',
'__wrapped__', '_app', '_backend', '_decorated', '_default_request',
'_exec_options', '_get_app', '_get_exec_options', '_get_request',
'abstract', 'acks_late', 'add_around', 'add_to_chord', 'add_trail',
'after_return', 'annotate', 'app', 'apply', 'apply_async', 'autoregister',
'backend', 'bind', 'chunks', 'default_retry_delay', 'delay', 'expires',
'from_config', 'ignore_result', 'map', 'max_retries', 'name', 'on_bound',
'on_failure', 'on_retry', 'on_success', 'pop_request', 'push_request',
'rate_limit', 'reject_on_worker_lost', 'replace', 'request',
'request_stack', 'resultrepr_maxsize', 'retry', 'run', 's', 'send_event',
'send_events', 'serializer', 'shadow_name', 'si', 'signature', 'signature_from_request',
'soft_time_limit', 'starmap', 'start_strategy', 'store_errors_even_if_ignored', 'subtask',
'subtask_from_request', 'throws', 'time_limit', 'track_started', 'trail', 'update_state']
I'm using django-celery-beat
to create scheduled tasks for Celery within a Django project.
I'd like to validate the schema of the args
and kwargs
fields at the time of creating the scheduled task, to anticipate and prevent future runtime issues. For example, API paramaters defined supplied via the kwargs
field.
I can access the list of registered tasks in my Django forms.ModelForm.clean()
method I've been able to access the registered Celery tasks. I thought that I could store the validation schema as an attribute of the the target @shared_task
and then access that during task creation. The schema is defined as a marshmallow.Schema
in case it's relevant but this could equally be hardcoded.
I was looking in the wrong place - importlib
makes this fairly trivial. I was thinking it was something that I needed to get from Celery.
import importlib
split_path = dot_notation_path.split(".")
module_name, callable_name = ".".join(split_path[:-1]), split_path[-1]
module_object = importlib.import_module(module_name)
callable_object = getattr(somemodule, callable_name)