Search code examples
pythonrabbitmqceleryfastapi

FastAPI, RabbitMQ, Celery: what's wrong with the code?


FastAPI app:

import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult


app = _fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)


@app.get("/{word}")
async def root(word: str):
    task = celery_app.send_task("celery_worker.test_celery", args=[word])
    return {"message": "Word received", "id": f"{task}"}


@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)
    # Task Not Ready
    if not task.ready():
        return {"status": task.status}
    # Task done: return the value
    task_result= task.get()
    result = task_result.get("result")
    return {"task_id": str(task_id),
            "status": task_result.get("status"),
            "result": result,
    }

Dockerfile:

FROM python:3.10-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY ./requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt --no-cache-dir
COPY . .

docker-compose.yml:

version: '3.8'
services:
  ylab:
    container_name: ylab
    build:
      context: .
    command: "uvicorn main:app --reload --host 0.0.0.0"
    ports:
      - "8000:8000"
    networks:
      - api_network
  rabbit:
    container_name: rabbit
    image: rabbitmq:3.10.7-management
    ports:
      - "15672:15672"
      - "5672:5672"
    networks:
      - api_network
  celery_worker:
    container_name: celery_worker
    build:
      context: .
    command: celery -A main.celery_app worker --loglevel=INFO
    networks:
      - api_network

networks:
  api_network:
    name: api_network

The root() function works well. I can send messages, return a task id, and see all the messages in the RabbitMQ queue, but the result() function for any task id returns task.ready() == False

Can anyone tell me what is the error in this code?

Services info:

RabbitMQ 3.10.7

Celery:

celery@415bde516932 v5.2.3 (dawn-chorus)

Linux-5.10.0-18-amd64-x86_64-with-glibc2.31 2023-02-05 12:02:49

app: worker:0x7f3679306c20

transport: amqp://guest:**@rabbit:5672//

results: rpc://

concurrency: 8 (prefork)

task events: OFF (enable -E to monitor tasks in this worker)

[queues]

.> celery exchange=celery(direct) key=celery


Solution

  • According to the documentation for task_track_started:

    If True the task will report its status as ‘started’ when the task is executed by a worker.

    But in your code, you don't seem to have anything consuming the tasks that you're placing on the queue. They will stay in PENDING state forever.

    I started by writing your code to use automatic task routing, using <func>.delay to call a task rather than the lower-level send_task method:

    import time
    
    import fastapi as _fastapi
    from celery import Celery
    from celery.result import AsyncResult
    
    app = _fastapi.FastAPI()
    
    celery_app = Celery(
        "worker",
        broker_url="amqp://guest:guest@rabbit:5672//",
        result_backend="rpc://",
    )
    celery_app.conf.update(task_track_started=True)
    
    @celery_app.task
    def test_celery(word):
        time.sleep(10)
        return word.upper()
    
    
    @app.get("/{word}")
    async def root(word: str):
        task = test_celery.delay(word)
        return {"message": "Word received", "id": f"{task}"}
    
    
    @app.get("/api/result/{task_id}")
    async def result(task_id: str):
        task = AsyncResult(task_id)
    
        # Task Not Ready
        if not task.ready():
            return {"status": task.status}
    
        # Task done: return the value
        task_result= task.get()
        return {"task_id": str(task_id),
                "result": task_result,
        }
    

    When running the above code, a connection to /foo results in:

    {"message":"Word received","id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609"}
    

    A subsequent call to /api/result/34bfe48d-6ab3-4dec-ad7d-aa567315a609 yields:

    {"status":"STARTED"}
    

    And if we wait for 10 seconds, the same request results in:

    {"task_id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609","result":"FOO"}
    

    We've demonstrated that things work correctly when using automatic task routing. So why isn't your original code working? There are three problems:

    1. You don't have anything watch test-queue.

      You're delivering tasks into test-queue, but your Celery worker is watching the default celery queue. You need to use the -Q argument to have it watch test-queue instead:

      celery_worker:
        container_name: celery_worker
        build:
          context: .
        command: celery -A main.celery_app worker --loglevel=INFO -Q test-queue
        networks:
          - api_network
      
    2. You don't have any tasks defined.

      If you add the -Q test-queue argument from the previous step and restart the environment, attempts to connect to /foo will result in the following traceback in your Celery worker:

      celery_worker    | [2023-02-05 14:12:40,864: ERROR/MainProcess] Received unregistered task of type 'celery_worker.test_celery'.
      celery_worker    | The message has been ignored and discarded.
      [...]
      celery_worker    | Traceback (most recent call last):
      celery_worker    |   File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
      celery_worker    |     strategy = strategies[type_]
      celery_worker    | KeyError: 'celery_worker.test_celery'
      

      We can fix that by registering the appropriate task with Celery:

      @celery_app.task(name="celery_worker.test_celery")
      def test_celery(word):
          time.sleep(10)
          return word.upper()
      
    3. With the previous two changes, your code will successfully submit the task to Celery and Celery will pass it to the test_celery function. However, calls to /api/result/<id> will fail with:

        File "/app/./main.py", line 39, in result
          result = task_result.get("result")
      AttributeError: 'str' object has no attribute 'get'
      

      You need to to modiofy your result function so that it looks more like:

      @app.get("/api/result/{task_id}")
      async def result(task_id: str):
          task = AsyncResult(task_id)
      
          # Task Not Ready
          if not task.ready():
              return {"status": task.status}
      
          # Task done: return the value
          task_result = task.get()
          return {
              "task_id": str(task_id),
              "result": task_result,
          }
      

    With these three changes, your original code works as intended. The complete modified code looks like:

    import time
    
    import fastapi
    from celery import Celery
    from celery.result import AsyncResult
    
    
    app = fastapi.FastAPI()
    
    celery_app = Celery(
        "worker",
        broker_url="amqp://guest:guest@rabbit:5672//",
        result_backend="rpc://",
    )
    celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
    celery_app.conf.update(task_track_started=True)
    
    
    @celery_app.task(name="celery_worker.test_celery")
    def test_celery(word):
        time.sleep(10)
        return word.upper()
    
    
    @app.get("/{word}")
    async def root(word: str):
        task = celery_app.send_task("celery_worker.test_celery", args=[word])
        return {"message": "Word received", "id": f"{task}"}
    
    
    @app.get("/api/result/{task_id}")
    async def result(task_id: str):
        task = AsyncResult(task_id)
    
        # Task Not Ready
        if not task.ready():
            return {"status": task.status}
    
        # Task done: return the value
        task_result = task.get()
        return {
            "task_id": str(task_id),
            "result": task_result,
        }