Search code examples
pythonrediscelerycelery-task

How to pass another function or instance to the celery task?


I have created a celery task as below

import os
import time

from celery import Celery
from dotenv import load_dotenv

load_dotenv()

celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")


@celery.task(name="create_task")
def message_sender(sender_func, numbers: list, message: str):
    sender_func(numbers, message)
    return "Sent Successful"

And calling the task as below

modem_conn = Modem()
task = message_sender.apply_async(
        kwargs={
            "sender_func": modem_conn.sms,
            "numbers": ["00000000"],
            "message": "sms sent",
        }
    )

But I am getting bellow error

kombu.exceptions.EncodeError: Object of type method is not JSON serializable

But if I call the task without delay or apply_async, then it workes. What could be the problem here and how can I achive this.

All I want to do is pass a function or instance while calling the celery task.


Solution

  • The celery task is run in another instance than your app, and both instances communicate via the broker. Since you don't "call" the task function, but only send messages with serialized data that tell the worker which function to call, you can't send objects or functions. This is similar to multiprocessing, where only serialized text messages can be sent between the processes.

    My approach would be to make the function known to the worker and then send e.g. a string with the name of the function and call it.

    sender:

    task = message_sender.apply_async(
        kwargs={
            "sender_func": "sms",
            "numbers": ["00000000"],
            "message": "sms sent",
        }
    )
    

    worker:

    @celery.task(name="create_task")
    def message_sender(sender_func, numbers: list, message: str):
        modem_conn = Modem()
        if sender_func == "sms":
            modem_conn.sms(numbers, message)
        return "Sent Successful"
    

    You could also use getattr or locals()