Search code examples
scalagrpcgrpc-java

java gRPC bi-stream disconnect: client Cancelled unexpected


I'm developing a runtime game server which heavily use java-gRPC bi-stream. The data flow like this:
Client <--grpc bi-stream--> Gateway Server <---- grpc bi-stream -----> Game Server.

The problem show by a simple scene example:

Gateway Server hold bi-stream like this (NOTICE: code is Scala language, compile to JVM class, just consider it like python if you from Java):

  // Gateway Server gRPC bi-stream API
  override def stateSyncStreamDemo(
    responseObserver: StreamObserver[StateSyncFrameDemo]
  ): StreamObserver[StateSyncStreamRequestDemo] = {
    // get context from gRPC interceptor
      val `Context-UserId-Key` =
    Context.key[String]("user-id")
      val `Context-RoomId-Key` =
    Context.key[String]("room-id")

  new StreamObserver[StateSyncStreamRequestDemo] {
      override def onNext(
        request: StateSyncStreamRequestDemo
      ): Unit = {
        // send to Game Server by gRPC bi-stream
        // 1. create bi-stream to Game Server if roomId not create before
        val streamConnection = getOrCreateRoomConnection(`Context-RoomId-Key`)
        val gameServerRequestStream = streamConnection.withCallCredentials(
          new StreamShardingClient.ClientMetadataCall(
            roomId // setting roomId to Metadata
          )
        ). sendWithBiStream(someResponseStreamDefined)
        // 2. send message forward to Game Server with userId and client cmd
        gameServerRequestStream.onNext(Message(request.cmd, userId))
      }
   ...

This forward pattern works well usually.
A problem occurred when I disconnection client grpc (I'm test with grpcurl), the stream of Gaetway Server <-> Game Server disconnected too. Error message like this:

CANCELLED: client cancelled
io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    at io.grpc.Status.asRuntimeException(Status.java:530) ~[grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [grpc-stub-1.52.1.jar:1.52.1]
    at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.52.1.jar:1.52.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]

I'm just expect only disconnect client <-> Gateway Server. Why affet Game Server connection? Thanks

==========UPDATE========
I'm find it's relative with Context management, after setting Context.fork() in interceptor, Gateway Server can send message to Game Server success but there are error when push stream message from Game Server to Gateway Server, error this:

StreamShardingClient.error occurred. CANCELLED: Failed to read message.
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
    at io.grpc.Status.asRuntimeException(Status.java:539) ~[grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487) [grpc-stub-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.52.1.jar:1.52.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
    at io.grpc.Status.asRuntimeException(Status.java:530) ~[grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366) ~[grpc-stub-1.52.1.jar:1.52.1]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.$anonfun$onNext$1(StreamShardingClient.scala:124) ~[classes/:?]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.$anonfun$onNext$1$adapted(StreamShardingClient.scala:117) ~[classes/:?]
    at scala.collection.immutable.Set$Set2.foreach(Set.scala:201) ~[scala-library-2.13.7.jar:?]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.onNext(StreamShardingClient.scala:117) ~[classes/:?]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.onNext(StreamShardingClient.scala:71) [classes/:?]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:474) ~[grpc-stub-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:473) ~[grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:675) ~[grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:660) ~[grpc-core-1.52.1.jar:1.52.1]
    ... 5 more

Solution

  • Have you checked the examples from the lib? You have a directory that has examples for cancellation. One for the client, another one for the server. In the README.md shows how to build and run them locally.

    What I can understand from there, is that StreamObserver is an interface with three non implemented methods. One is the onNext which is the one you are showing in the provided piece of code in your question. The other two are onError and onCompleted.

    If I'm not wrong, in each example (Client and Server) you can see that the onError is being overriden without too much logic, but looks like that is where the errors are being handled. Also you have other observers such as ClientCallStreamObserver and ServerCallStreamObserver that are being used in the example.

    Hope this helps