Search code examples
javaprotocol-buffersgrpcgrpc-java

gRPC stream become canceled


I'm trying to create a streaming API via gRPC, but my StreamObserver became canceled for some strange reason. Here is my .proto declarations:

service Service {
    rpc connect (ConnectionRequest) returns (stream StreamResponse) {}

    rpc act (stream ActRequest) returns (ActResponse) {}
}

The idea is that the users will connect so the StreamResponse will be saved for each user, and then on each act the StreamResponse will receive the update. Here is the java implementation of the Service

class ServiceImpl extends ServiceGrpc.ServiceImplBase {
    ............
    @Override
    public void connect(ConnectionRequest request, StreamObserver<StreamResponse> responseObserver) {
        observers.add(responseObserver);
    }

    @Override
    public StreamObserver<ActRequest> act(StreamObserver<ActResponse> responseObserver) {
        return return new StreamObserver<ActRequest>() {
            @Override
            public void onNext(ActRequest actRequest) {
                StreamResponse streamResponse = StreamResponse.newBuilder().build();
                observers.forEach(o -> o.onNext(streamResponse));
                actRequest..onCompleted();
            }
            ...............
        };
    }
    ..................
}

And here is the client connect method:

ServiceGrpc.ServiceStub stub = ServiceGrpc.newStub(channel);
new StreamObserver<PlayerResponse>() {
    ........
    @Override
    public void onNext(StreamResponse streamResponse) {
          logger.info("Act: " streamResponse.getData());
    }
    ........
}
ConnectionRequest request = ConnectionRequest.newBuilder().build();
stub.connect(request, streamObserver);

And the client act method:

StreamObserver<ActResponse> observer = new StreamObserver<ActResponse>() {
    @Override
    public void onNext(ActResponse actResponse) {
        logger.info("Status: " + actResponse.getSuccess());
    }
    .........
};
StreamObserver<ActRequest> act = stub.act(observer);
act.onNext(MoveRequest.newBuilder().build());
act.onCompleted();

When I start both client and server, the client is able to call connect. But the method act can be called only for the first time. When the client calls act for the second time I receive the following exception:

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
    at io.grpc.Status.asRuntimeException(Status.java:517)
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)

And in the debug mode I can see that the StreamObserver<StreamResponse> responseObserver is canceled=true.


Solution

  • I found out why I have got this error. In the client, I'm using Vaadin for building the UI. So on the StreamObserver<ActResponse>#onNext I'm calling a Vaadin method which throws an exception.

    The mistake I made while developing is that I left the onError empty as it was a PoC. But this mistake cost me a few hours of debugging. This is the stupid mistake, but I will let it be here, maybe it could save some time for somebody.