I have a task that performs a bulk update. Also, this task sends the status of the task to a certain place, for example, the task has started and the task has been successfully completed. I want to wrap this action in a decorator so that I can use it in subsequent tasks. But I ran into a problem that I can't attach a decorator to a celery task.
@celery.task
def change_statuses(*args, **kwargs) -> None:
call class which update status of task to pending
making bulk update
call class which update status of task to success
So I want to make like this decorator as base decorator
def update_state(state):
def decorator(func):
def wrapper(*args, **kwargs):
call class{state} which update status of task to pending
func(*args, **kwargs)
call class{state} which update status of task to success
return wrapper
return decorator
And call this decorator in every celery task, for example:
@celery.task
@update_state(UpdateBuildingsStatusEvent)
def change_status(*args, **kwargs) -> None:
making bulk update
But i have error:
The full contents of the message body was:
'{"task": "tasks.buildings_changes_statuses.change_status", "name": "change_status", "id": "******", "args": [], "kwargs": {"building_ids": [1, 2], "macroservice_id": 1, "user_id": 1, "macroservice_status": "connect", "task_id": "***"}}' (314b)
Traceback (most recent call last):
File "/home/aidar/Work/services.background_tasks/.env/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 581, in on_task_received
strategy = strategies[type_]
KeyError: 'tasks.buildings_changes_statuses.change_status'
I suspect the issue you have is that the celery.task
decorator is no longer getting called on a function called change_status
(which is referenced by name somewhere else). Instead it is getting applied to the return value of your decorator, named wrapper
. Since that has a different name, the celery
code doesn't know that it is standing in for change_status
.
To work around this issue, you can probably use functools.wraps
to update various attributes of your wrapper
function to match the function it is wrapped around. Try this:
from functools import wraps
def update_state(state):
def decorator(func):
@wraps(func) # apply fuctools.wraps here
def wrapper(*args, **kwargs):
...
return wrapper
return decorator