Search code examples
dockerredisceleryfastapi

fastapi, celery, redis, docker compose: Cannot assign requested address


I use fastapi, celery, redis and docker-compose to host a RestAPI. Everything works well except when I trigger a test of the api by a line of code in the Dockerfile. Without this line of code there is no problem (I can access the adress and test the api via web browser: No problems.).

I want to run everything as a microservice on Microsoft Azure, for example. To do this, the tests should be executed automatically when the containers are started up. Previously, I did this without docker-compose, celery and redis, i.e. only with the Dockerfile and fastapi. That worked without any problems.

With the triggered test I get the following error during the build: File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 975, in _connect sock.connect(socket_address) OSError: [Errno 99] Cannot assign requested address

My docker compose yaml:

# docker compose file version
# It is a docker intrinsic number
# not your version number.
version: '3.8'

services:

  web:
    build: 
      context: .
      dockerfile: Dockerfile
    ports:
        # Host Port:Container Port
      - 8004:8000
    command: uvicorn app.upload_file:app --host 0.0.0.0
    volumes:
      - .:/api_code
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  
  worker:
    build:
      context: .
      dockerfile: Dockerfile   
    command: celery --app=celery_task_app.worker.celery_app worker --loglevel=info --logfile=logs/celery.log
    volumes:
      - .:/api_code
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://:6379/0
    depends_on:
      - web
      - redis

  
  redis:
    image: redis:7


  dashboard:
    build: 
      context: .
      dockerfile: Dockerfile
    command: celery --broker=redis://redis:6379/0 flower --port=5555
    ports:
      - 5556:5555
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis
      - worker

In my Dockerfile I run RUN python3 -m tests.api_tests:

# Start from the official Python base image.

FROM python:3.9

WORKDIR /api_code

# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

# install dependencies
RUN pip install --upgrade pip

COPY ./requirements.txt /api_code/requirements.txt


# Install the package dependencies in the requirements file.
RUN pip install --no-cache-dir --upgrade -r /api_code/requirements.txt

RUN apt-get -y update && apt-get -y upgrade && apt-get install -y --no-install-recommends ffmpeg


# torch
#RUN pip install torch==1.13.0+cpu torchvision==0.14.0+cpu torchaudio==0.13.0 --extra-index-url https://download.pytorch.org/whl/cpu
RUN pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu


# whisper X
RUN pip install git+https://github.com/m-bain/whisperx.git

COPY ./app /api_code/app

COPY ./tests /api_code/tests

COPY ./logs /api_code/logs

COPY ./celery_task_app /api_code/celery_task_app

RUN python3 -m tests.non_api_tests

RUN python3 -m tests.api_tests

My api_tests.py file:

import unittest

import os

from jiwer import wer as jiwer_wer

from fastapi.testclient import TestClient


from tests.tools_for_tests import calculate_wer

from app.upload_file import app

print('///////////// API Tests Started /////////////')

class TestUpload(unittest.TestCase):
        def test_upload(self):

            print('\n----> Test upload.')

            file_name = "you-call-that-fun.wav"

            # Space at the beginning is no error.
            test_text = ' Fun! You call that fun? That was boring!'

            
            file_to_upload = open(os.path.join('tests', file_name), 'rb')
            files = {'in_file': file_to_upload}
            

            with TestClient(app) as client:
                response = client.post('/audio/upload',
                                        files=files)
                
                upload_response = response.json()
                task_id = upload_response['task_id']
                assert task_id

                task_response = client.get(f'/audio/result/{task_id}')

                content = task_response.json()

                assert content == {'task_id': str(task_id), 'status': 'Processing'}

                assert response.status_code == 202

                while content['status']== 'Processing':
                     
                    task_response = client.get(f'/audio/result/{task_id}')
                    content = response.json()

                assert content['task_id'] == task_id
                assert content['status'] == 'Success'
    
            # remove the test file
            base_dir = "/api_code/app/data/"
            os.unlink(os.path.join(base_dir, file_name))
            file_to_upload.close() 

if __name__ == '__main__':
    unittest.main()  

My upload.py file which contains the code for the API:**

# fastapi
from fastapi import FastAPI, UploadFile, HTTPException, status, File
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles

import shutil
import os
import time
from filetype import guess

# audio file: MP3 is not supported natively (nor OGG, FLAC, ...).
import sndhdr

from celery_task_app.tasks import predict_whisp_single
from celery.result import AsyncResult

from .tools import check_audio_file


# audio file meta informations
AUDIO_META = ['filetype',
              'sampling_rate',
              'nchannels',
              'nframes',
              'bits_per_sample']


app = FastAPI( title="Upload Audio Files",
    description="API to upload as well as check audio files and use whisperx.",
    version="1.0")

@app.post('/audio/upload',
          status_code=202,
          summary="Upload and process audio file.",
          description="One can upload an wav-file which will be processed by whisperx.",\
          responses={202: {'description': 'Accepted: Not Ready!'}})
async def upload(in_file: UploadFile= File(...)):

    start_time = time.time()

    print('\n##### Run upload and process file #####\n')
    
    try:
        # Current path: /app
        out_file_path = os.path.join("app/data/", f"{in_file.filename}")

        with open(out_file_path, "wb") as buffer_path:
            # copy the contents of a file-like object
            # to another file-like object
            # Here: Copies uploaded file to buffer_path
            shutil.copyfileobj(in_file.file, buffer_path)
        
        print("\tThe file has been read in and is now being processed.")

    except Exception:

        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"There was an error uploading the file: {os.path.abspath(os.getcwd())}"
                
        )          
    
    finally:
        in_file.file.close()

    task_id = check_audio_file(out_file_path, AUDIO_META)

    task_id = predict_whisp_single.delay(str(out_file_path))
    
    del in_file

    print("\n--- Runtime upload: %s seconds ---" % (time.time() - start_time))
    print("#################################################################")

    return {'task_id': str(task_id), 'status': 'Processing'}    
    

@app.get('/audio/result/{task_id}', status_code=202,\
         responses={202: {'description': 'Accepted: Not Ready!'},\
                    200: {'description': 'Processed and ready.'}})
async def audio_result(task_id):

    """Fetch result for given task_id"""

    task = AsyncResult(task_id)

    if not task.ready():
        print(app.url_path_for('upload'))
        return JSONResponse(status_code=202,\
                            content={'task_id': str(task_id), 'status': 'Processing'})
       
    result = task.get()

    return JSONResponse(status_code=status.HTTP_200_OK,\
                        content={'task_id': task_id, 'status': 'Success', 'prediction': result})


# Show image on frontend.
app.mount("/app", StaticFiles(directory="app"), name='images')
 
@app.get("/", response_class=HTMLResponse, status_code=status.HTTP_200_OK)
def serve():
    return """
    <html>
        <head>
            <title></title>
        </head>
        <body>
        <img src="/app/logo.webp">
        <h1>Welcome to the API</h1>
        </body>
    </html>
    """

Update

I used David Maze's advice and added the following to my yaml file:

api_test:
    build: 
      context: .
      dockerfile: Dockerfile  
    command: >
      /bin/sh -c 'export BACKEND_URL=http://localhost:8004 && python3 -m tests.api_tests'
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis
      - worker

The test starts but then in the linux terminal it prints "api_test_1 | /audio/upload" in an endless loop.

Update 09.01.2023

I used David Maze's advice and wrote a shell script:

#!/bin/sh

conda create api_integr_test

conda activate api_integr_test

pip install --no-cache-dir --upgrade -r requirements_test.txt

docker-compose build
docker-compose up -d --remove-orphans

export BACKEND_URL='http://localhost:8004'

python3 -m tests.api_tests

docker-compose down

My api_tests.py file now looks like this:

import unittest

import os

from time import sleep

from jiwer import wer as jiwer_wer

#from fastapi.testclient import TestClient


from tests.tools_for_tests import calculate_wer

from app.upload_file import app

import requests


print('///////////// API Tests Started /////////////')

BACKEND_URL = str(os.environ["BACKEND_URL"])

class TestUpload(unittest.TestCase):
        def test_upload(self):

            print('\n----> Test upload.')

            file_name = "you-call-that-fun.wav"

            # Space at the beginning is no error.
            test_text = ' Fun! You call that fun? That was boring!'

            file_path = os.path.join('tests', file_name)
            
            file_to_upload = open(file_path, 'rb')
            
            files = {'in_file': file_to_upload}

            url = BACKEND_URL + '/audio/upload'

            print('\n----> Start Post: Upload file and start processing.')
            response = requests.post(url, files=files)

            upload_response = response.json()

            task_id = upload_response['task_id']
            assert task_id

            print('\n----> Start Get: Process status.')
            task_response = requests.get(BACKEND_URL + f'/audio/result/{task_id}')  

            content = task_response.json()

            assert content == {'task_id': str(task_id), 'status': 'Processing'}

            assert response.status_code == 202

            count=1
            print('\n----> Start Get Loop')
            while content['status']== 'Processing':

                print(f'\n\t---{count}---')
                task_response = requests.get(BACKEND_URL + f'/audio/result/{task_id}')
                content = task_response.json()

                print('\t', content)
                print('\tSleep for 5 seconds')
                count+=1
                sleep(5)


            print('\n----> Success')
            assert content['task_id'] == task_id
            assert content['status'] == 'Success'

            print(content)
            
            file_to_upload.close()    

if __name__ == '__main__':
    unittest.main()  

That solved my problem for now.


Solution

  • A Dockerfile can never connect to another container. There are a couple of technical reasons for this; the most direct is that the image build process doesn't use the Docker network that Compose otherwise creates.

    The integration tests don't seem likely to affect the actual content in the image, and I wouldn't try to run them. The unit tests also seem unlikely to affect what will actually be in the image and should work identically in Docker and not, so I'd also omit these from the image.

    # Delete these lines:
    
    # Not used by the running application
    # COPY ./tests /api_code/tests
    
    # Doesn't affect the built image
    # RUN python3 -m tests.non_api_tests
    
    # Doesn't affect the built image
    # Fails because it can't reach other containers
    # RUN python3 -m tests.api_tests
    

    How would you run these integration tests? You have a straightforward setup where you're only making HTTP calls and verifying the results; there's no reason this needs to run from inside Docker. You can use an ordinary HTTP client like requests to drive this.

    So my typical flow here might look like this, running principally on the host:

    # Create a virtual environment and install the application there
    python3 -m venv ./venv
    . ./venv/bin/activate
    pip install .
    
    # Run unit tests
    python3 -m tests.non_api_tests
    
    # Build and launch the Docker setup
    docker-compose build
    docker-compose up -d
    
    # Run integration tests
    export BACKEND_URL=http://localhost:8004
    python3 -m tests.api_tests