Search code examples
gogrpcmiddlewareinterceptorgo-grpc-middleware

Get streamed file size from grpc server interceptor


I have a Go server bidirectional stream method defined like this in proto-file:

syntax = "proto3";
option go_package="pdfcompose/;pdfcompose";
package pdfcompose;

service PdfCompose {
  rpc Send (stream FileForm) returns (stream PdfFile) {}
}

message FileForm {
  bytes Upfile1 = 1;
  bytes Upfile2 = 2;
  bytes Upfile3 = 3;
}

message PdfFile {
  bytes File = 1;
}

And my log interceptor has the following interface:

func logInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)  error {
    fmt.Println("Log Interceptor")
    err := handler(srv, ss)
    if err != nil {
        return err
    }
    return nil
}

And I'm using https://github.com/grpc-ecosystem/go-grpc-middleware as an interceptor engine. I need to implement logging of the streaming file size (for educational purposes) and trying to find out where I can get any data about FileForm and it contents.

My first guess was to look into grpc.ServerStream argument (ss) to find something about it and it looks like it contains a lot of data like max and min MessageSize, but noting about actual content length.

How I can get size of incoming file with this kind of interceptors?


Solution

  • So, as @Brits has mentioned above, the working way to implement what I wanted is to write wrapper for stream.

    Here is an example: https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/validator/validator.go I took the following code from this repo and I hope I understand apache2 license correct and there is no issues with copying a part of this code:

    // StreamServerInterceptor returns a new streaming server interceptor that validates incoming messages.
    //
    // The stage at which invalid messages will be rejected with `InvalidArgument` varies based on the
    // type of the RPC. For `ServerStream` (1:m) requests, it will happen before reaching any userspace
    // handlers. For `ClientStream` (n:1) or `BidiStream` (n:m) RPCs, the messages will be rejected on
    // calls to `stream.Recv()`.
    func StreamServerInterceptor() grpc.StreamServerInterceptor {
        return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
            wrapper := &recvWrapper{stream}
            return handler(srv, wrapper)
        }
    }
    
    type recvWrapper struct {
        grpc.ServerStream
    }
    
    func (s *recvWrapper) RecvMsg(m interface{}) error {
        if err := s.ServerStream.RecvMsg(m); err != nil {
            return err
        }
    
        if err := validate(m); err != nil {
            return err
        }
    
        return nil
    }
    

    validate() function actually gets request as interface{} and you need to use type assertion to cast this request to type you needed as v := req.(type).

    For example, I typed request to FileForm and was able to check it's content:

    func logFileSize(req interface{}) error {
        m := req.(*pdfcompose.FileForm)
        println("SizeOfUpfile1: " + strconv.Itoa(int(binary.Size(m.Upfile1))))
    
        if m.Upfile2 != nil {
            println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
        }
        if m.Upfile3 != nil {
            println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
        }
        return nil
    }
    
    func (s *recvWrapper) RecvMsg(m interface{}) error {
        if err := s.ServerStream.RecvMsg(m); err != nil {
            return err
        }
        //z := m.(pdfcompose.FileForm)
        if err := logFileSize(m); err != nil {
            return err
        }
    
        return nil
    }
    
    func StreamServerInterceptor() grpc.StreamServerInterceptor {
        return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
            wrapper := &recvWrapper{stream}
            return handler(srv, wrapper)
        }
    }