I'm developing a runtime game server which heavily use java-gRPC bi-stream. The data flow like this:
Client <--grpc bi-stream--> Gateway Server <---- grpc bi-stream -----> Game Server
.
The problem show by a simple scene example:
Gateway Server hold bi-stream like this (NOTICE: code is Scala language, compile to JVM class, just consider it like python if you from Java):
// Gateway Server gRPC bi-stream API
override def stateSyncStreamDemo(
responseObserver: StreamObserver[StateSyncFrameDemo]
): StreamObserver[StateSyncStreamRequestDemo] = {
// get context from gRPC interceptor
val `Context-UserId-Key` =
Context.key[String]("user-id")
val `Context-RoomId-Key` =
Context.key[String]("room-id")
new StreamObserver[StateSyncStreamRequestDemo] {
override def onNext(
request: StateSyncStreamRequestDemo
): Unit = {
// send to Game Server by gRPC bi-stream
// 1. create bi-stream to Game Server if roomId not create before
val streamConnection = getOrCreateRoomConnection(`Context-RoomId-Key`)
val gameServerRequestStream = streamConnection.withCallCredentials(
new StreamShardingClient.ClientMetadataCall(
roomId // setting roomId to Metadata
)
). sendWithBiStream(someResponseStreamDefined)
// 2. send message forward to Game Server with userId and client cmd
gameServerRequestStream.onNext(Message(request.cmd, userId))
}
...
This forward pattern works well usually.
A problem occurred when I disconnection client grpc (I'm test with grpcurl
), the stream of Gaetway Server <-> Game Server disconnected too. Error message like this:
CANCELLED: client cancelled
io.grpc.StatusRuntimeException: CANCELLED: client cancelled
at io.grpc.Status.asRuntimeException(Status.java:530) ~[grpc-api-1.52.1.jar:1.52.1]
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [grpc-api-1.52.1.jar:1.52.1]
at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [grpc-api-1.52.1.jar:1.52.1]
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [grpc-api-1.52.1.jar:1.52.1]
at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [grpc-api-1.52.1.jar:1.52.1]
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.52.1.jar:1.52.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
I'm just expect only disconnect client <-> Gateway Server. Why affet Game Server connection? Thanks
==========UPDATE========
I'm find it's relative with Context management, after setting Context.fork()
in interceptor, Gateway Server
can send message to Game Server
success but there are error when push stream message from Game Server
to Gateway Server
, error this:
StreamShardingClient.error occurred. CANCELLED: Failed to read message.
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
at io.grpc.Status.asRuntimeException(Status.java:539) ~[grpc-api-1.52.1.jar:1.52.1]
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487) [grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.52.1.jar:1.52.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
at io.grpc.Status.asRuntimeException(Status.java:530) ~[grpc-api-1.52.1.jar:1.52.1]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366) ~[grpc-stub-1.52.1.jar:1.52.1]
at scalasharding.sharding.StreamShardingClient$ResponseObserver.$anonfun$onNext$1(StreamShardingClient.scala:124) ~[classes/:?]
at scalasharding.sharding.StreamShardingClient$ResponseObserver.$anonfun$onNext$1$adapted(StreamShardingClient.scala:117) ~[classes/:?]
at scala.collection.immutable.Set$Set2.foreach(Set.scala:201) ~[scala-library-2.13.7.jar:?]
at scalasharding.sharding.StreamShardingClient$ResponseObserver.onNext(StreamShardingClient.scala:117) ~[classes/:?]
at scalasharding.sharding.StreamShardingClient$ResponseObserver.onNext(StreamShardingClient.scala:71) [classes/:?]
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:474) ~[grpc-stub-1.52.1.jar:1.52.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:473) ~[grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:675) ~[grpc-core-1.52.1.jar:1.52.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:660) ~[grpc-core-1.52.1.jar:1.52.1]
... 5 more
Have you checked the examples from the lib? You have a directory that has examples for cancellation. One for the client, another one for the server. In the README.md shows how to build and run them locally.
What I can understand from there, is that StreamObserver is an interface
with three non implemented methods. One is the onNext which is the one you are showing in the provided piece of code in your question. The other two are onError and onCompleted.
If I'm not wrong, in each example (Client and Server) you can see that the onError
is being overriden without too much logic, but looks like that is where the errors are being handled. Also you have other observers such as ClientCallStreamObserver
and ServerCallStreamObserver
that are being used in the example.
Hope this helps