I have a Go server bidirectional stream method defined like this in proto-file:
syntax = "proto3";
option go_package="pdfcompose/;pdfcompose";
package pdfcompose;
service PdfCompose {
rpc Send (stream FileForm) returns (stream PdfFile) {}
}
message FileForm {
bytes Upfile1 = 1;
bytes Upfile2 = 2;
bytes Upfile3 = 3;
}
message PdfFile {
bytes File = 1;
}
And my log interceptor has the following interface:
func logInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fmt.Println("Log Interceptor")
err := handler(srv, ss)
if err != nil {
return err
}
return nil
}
And I'm using https://github.com/grpc-ecosystem/go-grpc-middleware as an interceptor engine. I need to implement logging of the streaming file size (for educational purposes) and trying to find out where I can get any data about FileForm and it contents.
My first guess was to look into grpc.ServerStream argument (ss) to find something about it and it looks like it contains a lot of data like max and min MessageSize, but noting about actual content length.
How I can get size of incoming file with this kind of interceptors?
So, as @Brits has mentioned above, the working way to implement what I wanted is to write wrapper for stream.
Here is an example: https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/validator/validator.go I took the following code from this repo and I hope I understand apache2 license correct and there is no issues with copying a part of this code:
// StreamServerInterceptor returns a new streaming server interceptor that validates incoming messages.
//
// The stage at which invalid messages will be rejected with `InvalidArgument` varies based on the
// type of the RPC. For `ServerStream` (1:m) requests, it will happen before reaching any userspace
// handlers. For `ClientStream` (n:1) or `BidiStream` (n:m) RPCs, the messages will be rejected on
// calls to `stream.Recv()`.
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapper := &recvWrapper{stream}
return handler(srv, wrapper)
}
}
type recvWrapper struct {
grpc.ServerStream
}
func (s *recvWrapper) RecvMsg(m interface{}) error {
if err := s.ServerStream.RecvMsg(m); err != nil {
return err
}
if err := validate(m); err != nil {
return err
}
return nil
}
validate()
function actually gets request as interface{}
and you need to use type assertion to cast this request to type you needed as v := req.(type)
.
For example, I typed request to FileForm
and was able to check it's content:
func logFileSize(req interface{}) error {
m := req.(*pdfcompose.FileForm)
println("SizeOfUpfile1: " + strconv.Itoa(int(binary.Size(m.Upfile1))))
if m.Upfile2 != nil {
println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
}
if m.Upfile3 != nil {
println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
}
return nil
}
func (s *recvWrapper) RecvMsg(m interface{}) error {
if err := s.ServerStream.RecvMsg(m); err != nil {
return err
}
//z := m.(pdfcompose.FileForm)
if err := logFileSize(m); err != nil {
return err
}
return nil
}
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapper := &recvWrapper{stream}
return handler(srv, wrapper)
}
}