Search code examples
gogrpc-go

Modifying metadata on Go GRPC server streaming interceptor


I've been trying to set metadata on a server stream interceptor so they could be read downstream by the actual RPC function:

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
    return handler(srv, ss)
}

func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
    ctx := client.Context()
    userID, ok := HeaderFromMetadata(ctx, "X-User-Id")

    log.Printf("User ID: %s, Ok: %t\n", userID, ok)
    return nil
}

func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
    meta, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "", false
    }

    for _, header := range headers {
        if value := meta.Get(header); len(value) > 0 {
            return value[0], true
        }
    }

    return "", false
}

And my server is registered like this:

server := grpc.NewServer(
    grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())

The problem I'm having is that the user ID header isn't being found. I can see that the interceptor is called when the client sends the request, and I can see that the metadata contains the header, but the actual RPC can't seem to extract it. What am I doing wrong here?


Solution

  • Update

    The more simple solution is only override the Context() method of ServerStream

    type serverStream struct {
        grpc.ServerStream
        ctx context.Context
    }
    
    func (s *serverStream) Context() context.Context {
        return s.ctx
    }
    
    func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        md, ok := metadata.FromIncomingContext(ss.Context())
        if ok {
            md.Append("X-User-Id", "real_user_id")
        }
        newCtx := metadata.NewIncomingContext(ss.Context(), md)
    
        return handler(srv, &serverStream{ss, newCtx})
    }
    

    Update

    Another simple solution is to define one wrapper to grpc.ServerStream as below

    type serverStreamWrapper struct {
        ss  grpc.ServerStream
        ctx context.Context
    }
    
    func (w serverStreamWrapper) Context() context.Context        { return w.ctx }
    func (w serverStreamWrapper) RecvMsg(msg interface{}) error   { return w.ss.RecvMsg(msg) }
    func (w serverStreamWrapper) SendMsg(msg interface{}) error   { return w.ss.SendMsg(msg) }
    func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
    func (w serverStreamWrapper) SetHeader(md metadata.MD) error  { return w.ss.SetHeader(md) }
    func (w serverStreamWrapper) SetTrailer(md metadata.MD)       { w.ss.SetTrailer(md) }
    
    func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        md, ok := metadata.FromIncomingContext(ss.Context())
        if ok {
            md.Append("X-User-Id", "real_user_id")
        }
        newCtx := metadata.NewIncomingContext(ss.Context(), md)
    
        return handler(srv, serverStreamWrapper{ss, newCtx})
    }
    

    You could use NewIncomingContext to create a copy of the current context in the stream.

    Since there is no method to set the context of grpc.ServerStream, in order to set the context back to ServerStream, the wrappedStream is defined with context.Context, and SetContext method to set context.Context

    type wrappedStream struct {
        grpc.ServerStream
        ctx context.Context
    }
    
    func (w *wrappedStream) SetContext(ctx context.Context) {
        w.ctx = ctx
    }
    

    Full sample codes

    type wrappedStream struct {
        grpc.ServerStream
        ctx context.Context
    }
    
    func (w *wrappedStream) Context() context.Context {
        return w.ctx
    }
    
    func (w *wrappedStream) SetContext(ctx context.Context) {
        w.ctx = ctx
    }
    
    func (w *wrappedStream) RecvMsg(m interface{}) error {
        return w.ServerStream.RecvMsg(m)
    }
    
    func (w *wrappedStream) SendMsg(m interface{}) error {
        return w.ServerStream.SendMsg(m)
    }
    
    type StreamContextWrapper interface {
        grpc.ServerStream
        SetContext(context.Context)
    }
    
    func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
        ctx := ss.Context()
        return &wrappedStream{
            ss,
            ctx,
        }
    }
    
    func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        md, ok := metadata.FromIncomingContext(ss.Context())
        if ok {
            md.Append("X-User-Id", "real_user_id")
        }
        newCtx := metadata.NewIncomingContext(ss.Context(), md)
    
        sw := newStreamContextWrapper(ss)
        sw.SetContext(newCtx)
    
        return handler(srv, sw)
    }