Search code examples
pythonceleryfastapitask-queue

How to determine the name of a task in celery?


I have a fastAPI app where I want to call a celery task I can not import the task as they are in two different code base. So I have to call it using its name.

in tasks.py

imagery = Celery(
    "imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)

...

@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
  print('running task')

The celery worker is running with this command:

celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery

Now in my FastAPI code base I want to run the filter task. So my understanding is I have to use the celery.send_task() function

In app.py I have

from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse

from app import models

app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))


@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
    """
    TODO: use a celery task(s) to query the database and upload the results to S3
    """
    data = ['ok', 'un test']
    data = ['ok', 'un test']
    result = tasks.send_task('workers.imagery.filter', args=list(data))
    return PlainTextResponse(f"here is the id: {str(result.ready())}")

After calling the /filter endpoint, I don't see any task being picked up by the worker. So I tried different name in send_task()

  • filter
  • imagery.filter
  • worker.imagery.filter

How come my task never get picked up by the worker and nothing shows in the log? Is my task name wrong?

Edit: The worker process run in docker. Here is the fullpath of the file on its disk.

  • tasks.py : /workers/worker.py

So if I follow the import schema. the name of the task would be workers.worker.filter but this does not work, nothing get printed in the logs of docker. Is a print supposed to appear in the STDOUT of the celery cli?


Solution

  • OP Here.

    This is the solution I used.

    task = signature("filter", kwargs=data.dict() ,queue="imagery")
    res = task.delay()
    

    As mentioned by @DejanLekic I had to specify the queue.