Search code examples
pythondjangocelery

Access target Python callable from Celery task instance


Is there a way to access the callable Python object from a Celery task, either directly or through a lookup method (like Django's `get_model()). I've got a dot-notation reference to where the callable is declared but I need the actual object to get access to it's attributes before sending / calling the task.

I've scoured through the public and private methods but none of them seem obviously what I'm looking for:

$ dir(celery_task_instance)

> ['AsyncResult', 'MaxRetriesExceededError', 'OperationalError', 'Strategy',
'__bound__', '__call__', '__class__', '__delattr__', '__dict__', '__dir__',
'__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__',
'__hash__', '__header__', '__init__', '__le__', '__lt__', '__module__',
'__name__', '__ne__', '__new__', '__qualname__', '__reduce__',
'__reduce_ex__', '__repr__', '__self__', '__setattr__', '__sizeof__',
'__str__', '__subclasshook__', '__trace__', '__v2_compat__', '__weakref__',
'__wrapped__', '_app', '_backend', '_decorated', '_default_request',
'_exec_options', '_get_app', '_get_exec_options', '_get_request',
'abstract', 'acks_late', 'add_around', 'add_to_chord', 'add_trail',
'after_return', 'annotate', 'app', 'apply', 'apply_async', 'autoregister',
'backend', 'bind', 'chunks', 'default_retry_delay', 'delay', 'expires',
'from_config', 'ignore_result', 'map', 'max_retries', 'name', 'on_bound',
'on_failure', 'on_retry', 'on_success', 'pop_request', 'push_request',
'rate_limit', 'reject_on_worker_lost', 'replace', 'request',
'request_stack', 'resultrepr_maxsize', 'retry', 'run', 's', 'send_event',
'send_events', 'serializer', 'shadow_name', 'si', 'signature', 'signature_from_request',
'soft_time_limit', 'starmap', 'start_strategy', 'store_errors_even_if_ignored', 'subtask',
'subtask_from_request', 'throws', 'time_limit', 'track_started', 'trail', 'update_state']

Background

I'm using django-celery-beat to create scheduled tasks for Celery within a Django project.

I'd like to validate the schema of the args and kwargs fields at the time of creating the scheduled task, to anticipate and prevent future runtime issues. For example, API paramaters defined supplied via the kwargs field.

I can access the list of registered tasks in my Django forms.ModelForm.clean() method I've been able to access the registered Celery tasks. I thought that I could store the validation schema as an attribute of the the target @shared_task and then access that during task creation. The schema is defined as a marshmallow.Schema in case it's relevant but this could equally be hardcoded.


Solution

  • I was looking in the wrong place - importlib makes this fairly trivial. I was thinking it was something that I needed to get from Celery.

    import importlib
    split_path = dot_notation_path.split(".")
    module_name, callable_name = ".".join(split_path[:-1]), split_path[-1]
    module_object = importlib.import_module(module_name)
    callable_object = getattr(somemodule, callable_name)