Search code examples
pythonpython-asynciogrpcinterceptorgrpc-python

Logging interceptor using grpc.aio python


I can't find an example implementation of a logging interceptor for python that measures the time it takes to process a request.

I want to measure how long it takes to process a request on the server side. I tried to write my own logging interceptor without success.

It seems to me that continuation doesnt wait for the request to be completed.

Here is my .proto file

syntax = "proto3";

package auth;

service SampleService {
  rpc Hello(HelloRequest) returns (HelloResponse);
}

message HelloRequest{
  string Name = 1;
}

message HelloResponse{
  string Greeting = 1;
}

grpc.py

import asyncio
from typing import Callable, Awaitable

import grpc

from pkg.py.gen import service_pb2
from pkg.py.gen import service_pb2_grpc
from ..logger import get_logger
from ..services import Service


class SampleService(service_pb2_grpc.SampleServiceServicer):
    def __init__(self) -> None:
        super().__init__()

    async def Hello(self, request: service_pb2.HelloRequest, _) -> service_pb2.HelloResponse:
        await asyncio.sleep(1)
        return service_pb2.HelloResponse(Greeting=Service.hello(request.Name))


class LoggingServerInterceptor(grpc.aio.ServerInterceptor):
    async def intercept_service(
            self,
            continuation: Callable[
                [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
            ],
            handler_call_details: grpc.HandlerCallDetails,
    ) -> grpc.RpcMethodHandler:
        ...

app.py

from abc import ABC, abstractmethod
from concurrent import futures

import grpc.aio

from pkg.py.gen import service_pb2_grpc
from ..config import Config
from ..logger import get_logger
from ..transport import SampleService, LoggingServerInterceptor


class AbstractApp(ABC):
    @abstractmethod
    async def run(self) -> None:
        ...

    @abstractmethod
    async def stop(self) -> None:
        ...


class App(AbstractApp):
    def __init__(self):
        self.server = grpc.aio.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=[LoggingServerInterceptor()]
        )
        service_pb2_grpc.add_SampleServiceServicer_to_server(
            SampleService(), self.server
        )
        address = f"{Config().HOST}:{Config().PORT}"
        logger = get_logger()
        logger.info(f"Listening on {address}")
        self.server.add_insecure_port(address)

    async def run(self) -> None:
        await self.server.start()
        await self.server.wait_for_termination()

    async def stop(self) -> None:
        logger = get_logger()
        logger.info("Shutting down in progress")

main.py

import asyncio
import sys
from pathlib import Path

root = Path(__file__).resolve().parents[2].__str__()  # noqa: E402
sys.path.append(root)  # noqa: E402

from internal.config import Config
from internal.logger import get_logger, setup_logger
from internal.app.app import App


def main() -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    app = App()
    logger = get_logger()

    loop.create_task(app.run())

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        logger.info("Gracefully shutting down")
        loop.run_until_complete(app.stop())
        loop.close()
        logger.info("Shut down completed")


if __name__ == '__main__':
    main()

Solution

  • Found that lib.

    import time
    from typing import Callable, Any
    
    import grpc
    from grpc_interceptor.server import AsyncServerInterceptor
    
    
    class RequestLoggingInterceptor(AsyncServerInterceptor):
        async def intercept(
                self,
                method: Callable,
                request: Any,
                context: grpc.ServicerContext,
                method_name: str,
        ) -> Any:
            start_time = time.monotonic()
    
            try:
                return await method(request, context)
            finally:
                request_time = time.monotonic() - start_time
                get_logger().info(
                    f"Request {method_name} - duration {request_time}s"
                )