Search code examples
pythondjangouvicorndaphneasgi

Uvicorn async workers are still working synchronously


Question in short

I have migrated my project from Django 2.2 to Django 3.2, and now I want to start using the possibility for asynchronous views. I have created an async view, setup asgi configuration, and run gunicorn with a Uvicorn worker. When swarming this server with 10 users concurrently, they are served synchronously. What do I need to configure in order to serve 10 concurrent users an async view?

Question in detail

This is what I did so far in my local environment:

  • I am working with Django 3.2.10 and Python 3.9.
  • I have installed gunicorn and uvicorn through pip
  • I have created an asgi.py file with the following contents
    import os
    from django.core.asgi import get_asgi_application
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyService.settings.local')
    application = get_asgi_application()
  • I have created a view with the following implementation, and connected it in urlpatterns:
    import asyncio
    import json
    from django.http import HttpResponse
    
    async def async_sleep(request):
        await asyncio.sleep(1)
        return HttpResponse(json.dumps({'mode': 'async', 'time': 1).encode())
  • I run locally a gunicorn server with a Uvicorn worker:
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8732] [INFO] Starting gunicorn 20.1.0
[2022-01-26 14:37:14 +0100] [8732] [INFO] Listening at: http://127.0.0.1:8000 (8732)
[2022-01-26 14:37:14 +0100] [8732] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8733] [INFO] Booting worker with pid: 8733
[2022-01-26 13:37:15 +0000] [8733] [INFO] Started server process [8733]
[2022-01-26 13:37:15 +0000] [8733] [INFO] Waiting for application startup.
[2022-01-26 13:37:15 +0000] [8733] [INFO] ASGI 'lifespan' protocol appears unsupported.
[2022-01-26 13:37:15 +0000] [8733] [INFO] Application startup complete.
  • I hit the API from a local client once. After 1 second, I get a 200 OK, as expected.
  • I set up a locust server to spawn concurrent users. When I let it make requests with 1 concurrent user, every 1 second an API call is completed.
  • When I let it make requests with 10 concurrent users, every 1 second an API call is completed. All other requests are waiting.

This last thing is not what I expect. I expect the worker, while sleeping asynchronously, to pick up the next request already. Am I missing some configuration?

I also tried it by using Daphne instead of Uvicorn, but with the same result.

Locust

This is how I have set up my locust.

  • Start a new virtualenv
  • pip install locust
  • Create a locustfile.py with the following content:
from locust import HttpUser, task
class SleepUser(HttpUser):
    @task
    def async_sleep(self):
        self.client.get('/api/async_sleep/')

Middleware

These are my middleware settings

MIDDLEWARE = [
    'django_prometheus.middleware.PrometheusBeforeMiddleware',
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.gzip.GZipMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'django.middleware.security.SecurityMiddleware',
    'shared.common.middleware.ApiLoggerMiddleware',
    'django_prometheus.middleware.PrometheusAfterMiddleware',
]

The ApiLoggerMiddleware from shared is from our own code, I will investigate this one first. This is the implementation of it.

import logging
import os
from typing import List

from django.http import HttpRequest, HttpResponse
from django.utils import timezone

from shared.common.authentication_service import BaseAuthenticationService


class ApiLoggerMiddleware:
    TOO_BIG_FOR_LOG_BYTES = 2 * 1024

    def __init__(self, get_response):
        # The get_response callable is provided by Django, it is a function
        # that takes a request and returns a response. Plainly put, once we're
        # done with the incoming request, we need to pass it along to get the
        # response which we need to ultimately return.
        self._get_response = get_response
        self.logger = logging.getLogger('api')
        self.pid = os.getpid()
        self.request_time = None
        self.response_time = None

    def __call__(self, request: HttpRequest) -> HttpResponse:
        common_data = self.on_request(request)
        response = self._get_response(request)
        self.on_response(response, common_data)
        return response

    def truncate_body(self, request: HttpRequest) -> str:
        return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"

    def on_request(self, request: HttpRequest) -> List[str]:
        self.request_time = timezone.now()

        remote_address = self.get_remote_address(request)
        user_agent = request.headers.get('User-Agent') or ''
        customer_uuid = self.get_customer_from_request_auth(request)
        method = request.method
        uri = request.get_raw_uri()

        common = [
            remote_address,
            user_agent,
            customer_uuid,
            method,
            uri
        ]

        in_line = [
                      "IN",
                      str(self.pid),
                      str(self.request_time),
                  ] + common + [
                      self.truncate_body(request)
                  ]

        self.logger.info(', '.join(in_line))
        return common

    def on_response(self, response: HttpResponse, common: List[str]) -> None:
        self.response_time = timezone.now()

        out_line = [
                       "OUT",
                       str(self.pid),
                       str(self.response_time)
                   ] + common + [
                       str(self.response_time - self.request_time),
                       str(response.status_code),
                   ]
        self.logger.info(", ".join(out_line))

    @classmethod
    def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
        token = request.headers.get('Authorization')
        if not token:
            return 'no token'
        try:
            payload = BaseAuthenticationService.validate_access_token(token)
            return payload.get('amsOrganizationId', '')
        except Exception:
            return 'unknown'

    @classmethod
    def get_remote_address(cls, request: HttpRequest) -> str:
        if 'X-Forwarded-For' in request.headers:
            # in case the request comes in through a proxy, the remote address
            # will be just the last proxy that passed it along, that's why we
            # have to get the remote from X-Forwarded-For
            # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
            addresses = request.headers['X-Forwarded-For'].split(',')
            client = addresses[0]
            return client
        else:
            return request.META.get('REMOTE_ADDR', '')

Sources

Sources I have used:


Solution

  • Your ApiLoggerMiddleware is a synchronous middleware.

    From https://docs.djangoproject.com/en/4.0/topics/async/#async-views, emphasis mine:

    You will only get the benefits of a fully-asynchronous request stack if you have no synchronous middleware loaded into your site. If there is a piece of synchronous middleware, then Django must use a thread per request to safely emulate a synchronous environment for it.

    Middleware can be built to support both sync and async contexts. Some of Django’s middleware is built like this, but not all. To see what middleware Django has to adapt, you can turn on debug logging for the django.request logger and look for log messages about “Synchronous middleware … adapted”.

    (The log message currently says "Asynchronous middleware ... adapted", bug reported at #33495.)

    Turn on debug logging for the django.request logger by adding this to your LOGGING setting:

    'django.request': {
        'handlers': ['console'],
        'level': 'DEBUG',
    },
    

    Solution

    To make ApiLoggerMiddleware asynchronous:

    1. Inherit django.utils.deprecation.MiddlewareMixin.
      • call super().__init__(get_response) in __init__.
      • remove __call__; MiddlewareMixin.__call__ makes your middleware asynchronous.
    2. Refactor on_request to process_request.
      • return None instead of common.
      • attach common to request instead: request.common = common.
        remember to update references to request.common.
      • attach request_time to request instead of self to make it (and the middleware) thread-safe.
        remember to update references to request.request_time.
    3. Refactor on_response(self, response, common) to process_response(self, request, response).
      • return response.
      • don't attach response_time to self; leave it as a variable since it's not used in other functions.

    The result:

    class ApiLoggerMiddleware(MiddlewareMixin):
        TOO_BIG_FOR_LOG_BYTES = 2 * 1024
    
        def __init__(self, get_response):
            # The get_response callable is provided by Django, it is a function
            # that takes a request and returns a response. Plainly put, once we're
            # done with the incoming request, we need to pass it along to get the
            # response which we need to ultimately return.
            super().__init__(get_response)  # +
            self._get_response = get_response
            self.logger = logging.getLogger('api')
            self.pid = os.getpid()
            # self.request_time = None   # -
            # self.response_time = None  # -
    
        # def __call__(self, request: HttpRequest) -> HttpResponse:  # -
        #     common_data = self.on_request(request)                 # -
        #     response = self._get_response(request)                 # -
        #     self.on_response(response, common_data)                # -
        #     return response                                        # -
    
        def truncate_body(self, request: HttpRequest) -> str:
            return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
    
        # def on_request(self, request: HttpRequest) -> List[str]:  # -
        def process_request(self, request: HttpRequest) -> None:    # +
            # self.request_time = timezone.now()   # -
            request.request_time = timezone.now()  # +
    
            remote_address = self.get_remote_address(request)
            user_agent = request.headers.get('User-Agent') or ''
            customer_uuid = self.get_customer_from_request_auth(request)
            method = request.method
            uri = request.get_raw_uri()
    
            common = [
                remote_address,
                user_agent,
                customer_uuid,
                method,
                uri
            ]
    
            in_line = [
                "IN",
                str(self.pid),
                # str(self.request_time),   # -
                str(request.request_time),  # +
            ] + common + [
                self.truncate_body(request)
            ]
    
            self.logger.info(', '.join(in_line))
            # return common          # -
            request.common = common  # +
            return None              # +
    
        # def on_response(self, response: HttpResponse, common: List[str]) -> None:                # -
        def process_response(self, request: HttpRequest, response: HttpResponse) -> HttpResponse:  # +
            # self.response_time = timezone.now()  # -
            response_time = timezone.now()         # +
    
            out_line = [
                "OUT",
                str(self.pid),
                # str(self.response_time)  # -
                str(response_time)         # +
                # ] + common + [                    # -
            ] + getattr(request, 'common', []) + [  # +
                # str(self.response_time - self.request_time),             # -
                str(response_time - getattr(request, 'request_time', 0)),  # +
                str(response.status_code),
            ]
            self.logger.info(", ".join(out_line))
            return response  # +
    
        @classmethod
        def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
            token = request.headers.get('Authorization')
            if not token:
                return 'no token'
            try:
                payload = BaseAuthenticationService.validate_access_token(token)
                return payload.get('amsOrganizationId', '')
            except Exception:
                return 'unknown'
    
        @classmethod
        def get_remote_address(cls, request: HttpRequest) -> str:
            if 'X-Forwarded-For' in request.headers:
                # in case the request comes in through a proxy, the remote address
                # will be just the last proxy that passed it along, that's why we
                # have to get the remote from X-Forwarded-For
                # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
                addresses = request.headers['X-Forwarded-For'].split(',')
                client = addresses[0]
                return client
            else:
                return request.META.get('REMOTE_ADDR', '')