I have a StreamObserver object that executes an asynchronous call:
StreamObserver<ServerReflectionResponse> responseStreamObserver = new StreamObserver<ServerReflectionResponse>() {
@Override
public void onNext(ServerReflectionResponse response) {
responseList.add(response);
}
}
StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(responseStreamObserver);
requestStreamObserver.onNext(req); // append result from call to array
requestStreamObserver.onCompleted();
var sz = responseList.length(); // want to block here until the response has been added
In the requestStreamObserver.onNext function, I append the result from the call to a list; however, because the "serverReflectionInfo" call is asynchronous, when I get the length in the next line, it returns 0 since it is reading from the list before the call is finished. Pretty confused on this, what is the easiest way to handle this issue? Are there any preexisting ways to handle this in the gRPC library? Ultimately, my goal is just to return a value from this broader function.
The proto definition is here: https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto#L24
Only thing I can think of is using something like a semaphore to wait until I want to read the values from the list, then signal the semaphore after the response has been added. Any input is appreciated, thanks
Guava's Futures can simplify "make the async become sync":
SettableFuture<List<ServerReflectionResponse>> responseListFuture = SettableFuture.create();
StreamObserver<ServerReflectionResponse> responseStreamObserver
= new StreamObserver<ServerReflectionResponse>() {
private final List<ServerReflectionResponse> responseList = new ArrayList<>();
@Override
public void onNext(ServerReflectionResponse response) {
responseList.add(response);
}
@Override
public void onCompleted() {
responseListFuture.set(responseList);
}
@Override
public void onError(Throwable t) {
responseListFuture.setException(t);
}
};
StreamObserver<ServerReflectionRequest> requestStreamObserver
= reflectionStub.serverReflectionInfo(responseStreamObserver);
requestStreamObserver.onNext(req);
requestStreamObserver.onCompleted();
// get() throws if the operation failed.
List<ServerReflectionResponse> responseList = responseListFuture.get();
var sz = responseList.length();