Search code examples
gogrpcgrpc-go

GRPC stream close connect


I am writing a server handle data stream on golang using grpc. After recive a request, I should put this stream to a Chan, then a goroutine handle this request and send back. But I get a rpc error: code = Unavailable desc = transport is closing when I write back to client in the goroutine. So I wonder if I can pass stream to Channel, does this operation close the connection?

here is Recognize in protocol buffer

service AsrService {
     rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}

here is automatic generation using grpc

type AsrService_RecognizeServer interface {
    Send(*RecognizeResponse) error
    Recv() (*RecognizeRequest, error)
    grpc.ServerStream
}

here is put stream to Chan

func (s *ScheduleServer) Recognize(stream 
AsrService_RecognizeServer) error {
    req, err := stream.Recv() // I can use Recv here
    if err == io.EOF || err != nil {
        // do something
    }
    var asrRequest ASRRequest
    asrRequest.stream = &stream //pass stream to Chan
    ASRRequestChan <- &asrRequest

    return nil
}

Here is a goroutine to handle the Chan

type ASRRequest struct {
    stream AsrService_RecognizeServer
}

var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
    for {
        select {
            case r := <- ClientRequestChan:
                Log.Infof("Chan get request info[%v]", r)
                var rsp RecognizeResponse
                rsp.Code = **
                streamInter := *r.stream
                err = streamInter.Send(&rsp) // I can use Send here
                if err != nil {
                    fmt.Printf("Grpc write failed,err[%v]", err)
                }
                fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
        }
    }    
}

then I get error rpc error: code = Unavailable desc = transport is closing, so is the stream closed after pass it to the Chan? Because if I do not use Chan, it can send result to client successfully.


Solution

  • I change the strategy and use sync.WaitGroup to make sure main goroutine do not return until the stream send back. I will build a goroutine to handle this stream, and the main goroutine does not return until the child goroutine finish. So the connect will not close.

    var wg sync.WaitGroup
    func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
        wg.Add(1)
        go s.Recognize_Syn(&wg, stream)
    
        wg.Wait()
        return nil
    }
    
    func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
        defer wg.Done()
        //do something
        err = stream.Send(&rsp)
        return nil
    }