I can't find an example implementation of a logging interceptor for python that measures the time it takes to process a request.
I want to measure how long it takes to process a request on the server side. I tried to write my own logging interceptor without success.
It seems to me that continuation
doesnt wait for the request to be completed.
Here is my .proto file
syntax = "proto3";
package auth;
service SampleService {
rpc Hello(HelloRequest) returns (HelloResponse);
}
message HelloRequest{
string Name = 1;
}
message HelloResponse{
string Greeting = 1;
}
grpc.py
import asyncio
from typing import Callable, Awaitable
import grpc
from pkg.py.gen import service_pb2
from pkg.py.gen import service_pb2_grpc
from ..logger import get_logger
from ..services import Service
class SampleService(service_pb2_grpc.SampleServiceServicer):
def __init__(self) -> None:
super().__init__()
async def Hello(self, request: service_pb2.HelloRequest, _) -> service_pb2.HelloResponse:
await asyncio.sleep(1)
return service_pb2.HelloResponse(Greeting=Service.hello(request.Name))
class LoggingServerInterceptor(grpc.aio.ServerInterceptor):
async def intercept_service(
self,
continuation: Callable[
[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
...
app.py
from abc import ABC, abstractmethod
from concurrent import futures
import grpc.aio
from pkg.py.gen import service_pb2_grpc
from ..config import Config
from ..logger import get_logger
from ..transport import SampleService, LoggingServerInterceptor
class AbstractApp(ABC):
@abstractmethod
async def run(self) -> None:
...
@abstractmethod
async def stop(self) -> None:
...
class App(AbstractApp):
def __init__(self):
self.server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingServerInterceptor()]
)
service_pb2_grpc.add_SampleServiceServicer_to_server(
SampleService(), self.server
)
address = f"{Config().HOST}:{Config().PORT}"
logger = get_logger()
logger.info(f"Listening on {address}")
self.server.add_insecure_port(address)
async def run(self) -> None:
await self.server.start()
await self.server.wait_for_termination()
async def stop(self) -> None:
logger = get_logger()
logger.info("Shutting down in progress")
main.py
import asyncio
import sys
from pathlib import Path
root = Path(__file__).resolve().parents[2].__str__() # noqa: E402
sys.path.append(root) # noqa: E402
from internal.config import Config
from internal.logger import get_logger, setup_logger
from internal.app.app import App
def main() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app = App()
logger = get_logger()
loop.create_task(app.run())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
logger.info("Gracefully shutting down")
loop.run_until_complete(app.stop())
loop.close()
logger.info("Shut down completed")
if __name__ == '__main__':
main()
Found that lib.
import time
from typing import Callable, Any
import grpc
from grpc_interceptor.server import AsyncServerInterceptor
class RequestLoggingInterceptor(AsyncServerInterceptor):
async def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
start_time = time.monotonic()
try:
return await method(request, context)
finally:
request_time = time.monotonic() - start_time
get_logger().info(
f"Request {method_name} - duration {request_time}s"
)