Search code examples
androidgrpcmobile-applicationgrpc-java

Java gRPC server streaming model


I looking for an example with a client-side listener for server-side streaming by following examples here - https://grpc.io/docs/languages/java/basics/

I have configured the keep-alive configurations on client and server as per doc - https://github.com/grpc/grpc/blob/master/doc/keepalive.md

All of the examples show that when the request is made to the server, the server will respond as an async or unary model or the client can chat with the new request. Every time the request has to be sent to the server to respond. And Connection stays alive for a much longer time; which is good.

Question: Is it possible for the client to send one request and indefinitely receive responses from the server without using the BIDI (bidirectional) model? it would be somewhat event-based in this scenario that whenever there is data available at the server, it will keep sending without requiring further request. Again, this communication is a unicast, not a broadcast; meaning each connection (user) is unique to respond to based on the user's data.

Not sure but one way could be - the server to keep the request object active in a cache somewhere and keep responding? Will the client have to use a polling loop to listening for responses indefinitely?

Looking forward to examples if any that can solve this problem.


Solution

  • One client request and many server responses is "server streaming." You would use the stream keyword just for the response.

    You are able to use the blocking or async API for server-streaming RPCs. The blocking API is used in the example, because blocking operations are much more clear in general. But an async stub could have been used as well.

    For server-streaming, the async client API has you pass the request and the response observer to start the call. After that, it will look the same as the bidi example, except that the client would not send any more messages. The server can call the StreamObserver whenever it wants to send a message, and the client's StreamObserver will be called when a new message arrives.

    To see an example, you can replace the blocking stub of listFeatures() with the async stub:

    final CountDownLatch finishLatch = new CountDownLatch(1);
    asyncStub.listFeatures(request, new StreamObserver<Feature>() {
      int i;
    
      @Override public void onNext(Feature feature) {
        info("Result #" + i + ": {0}", feature);
        if (testHelper != null) {
          testHelper.onMessage(feature);
        }
        i++;
      }
    
      @Override public void onCompleted() {
        finishLatch.countDown();
      }
    
      @Override public void onError(Throwable t) {
        warning("RPC failed: {0}", Status.fromThrowable(t));
        if (testHelper != null) {
          testHelper.onRpcError(t);
        }
        finishLatch.countDown();
      }
    });
    finishLatch.await();