I'm trying to create a streaming API via gRPC, but my StreamObserver became canceled for some strange reason.
Here is my .proto
declarations:
service Service {
rpc connect (ConnectionRequest) returns (stream StreamResponse) {}
rpc act (stream ActRequest) returns (ActResponse) {}
}
The idea is that the users will connect
so the StreamResponse
will be saved for each user, and then on each act
the StreamResponse
will receive the update. Here is the java implementation of the Service
class ServiceImpl extends ServiceGrpc.ServiceImplBase {
............
@Override
public void connect(ConnectionRequest request, StreamObserver<StreamResponse> responseObserver) {
observers.add(responseObserver);
}
@Override
public StreamObserver<ActRequest> act(StreamObserver<ActResponse> responseObserver) {
return return new StreamObserver<ActRequest>() {
@Override
public void onNext(ActRequest actRequest) {
StreamResponse streamResponse = StreamResponse.newBuilder().build();
observers.forEach(o -> o.onNext(streamResponse));
actRequest..onCompleted();
}
...............
};
}
..................
}
And here is the client connect
method:
ServiceGrpc.ServiceStub stub = ServiceGrpc.newStub(channel);
new StreamObserver<PlayerResponse>() {
........
@Override
public void onNext(StreamResponse streamResponse) {
logger.info("Act: " streamResponse.getData());
}
........
}
ConnectionRequest request = ConnectionRequest.newBuilder().build();
stub.connect(request, streamObserver);
And the client act
method:
StreamObserver<ActResponse> observer = new StreamObserver<ActResponse>() {
@Override
public void onNext(ActResponse actResponse) {
logger.info("Status: " + actResponse.getSuccess());
}
.........
};
StreamObserver<ActRequest> act = stub.act(observer);
act.onNext(MoveRequest.newBuilder().build());
act.onCompleted();
When I start both client and server, the client is able to call connect
. But the method act
can be called only for the first time. When the client calls act
for the second time I receive the following exception:
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
at io.grpc.Status.asRuntimeException(Status.java:517)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)
And in the debug mode I can see that the StreamObserver<StreamResponse> responseObserver
is canceled=true
.
I found out why I have got this error. In the client, I'm using Vaadin
for building the UI. So on the StreamObserver<ActResponse>#onNext
I'm calling a Vaadin method which throws an exception.
The mistake I made while developing is that I left the onError
empty as it was a PoC. But this mistake cost me a few hours of debugging.
This is the stupid mistake, but I will let it be here, maybe it could save some time for somebody.