I have created a celery task as below
import os
import time
from celery import Celery
from dotenv import load_dotenv
load_dotenv()
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")
@celery.task(name="create_task")
def message_sender(sender_func, numbers: list, message: str):
sender_func(numbers, message)
return "Sent Successful"
And calling the task as below
modem_conn = Modem()
task = message_sender.apply_async(
kwargs={
"sender_func": modem_conn.sms,
"numbers": ["00000000"],
"message": "sms sent",
}
)
But I am getting bellow error
kombu.exceptions.EncodeError: Object of type method is not JSON serializable
But if I call the task without delay
or apply_async
, then it workes. What could be the problem here and how can I achive this.
All I want to do is pass a function or instance while calling the celery task.
The celery task is run in another instance than your app, and both instances communicate via the broker. Since you don't "call" the task function, but only send messages with serialized data that tell the worker which function to call, you can't send objects or functions. This is similar to multiprocessing, where only serialized text messages can be sent between the processes.
My approach would be to make the function known to the worker and then send e.g. a string with the name of the function and call it.
sender:
task = message_sender.apply_async(
kwargs={
"sender_func": "sms",
"numbers": ["00000000"],
"message": "sms sent",
}
)
worker:
@celery.task(name="create_task")
def message_sender(sender_func, numbers: list, message: str):
modem_conn = Modem()
if sender_func == "sms":
modem_conn.sms(numbers, message)
return "Sent Successful"