I'm currently working on a grpc server that will receive streaming grpc calls from the first server and redirect these calls to the second server, and redirect responses from the second server as streams to the first one.
I have 2 proto files first proto
First file:
syntax = "proto3";
package first.proto.pack;
service FirstProtoService {
rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}
message RequestToFirstServer {
oneof firstStreamingRequest {
int32 x = 1;
int32 y = 2;
}
}
message ResponseForFirstServer {
string someprocessedinformation = 1;
}
Second file:
syntax = "proto3";
package second.proto.pack;
service SecondProtoService {
rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}
message RequestToSecondServer {
oneof secondStreamingRequest {
int32 processedX = 1;
int32 procesdedY = 2;
}
}
message ResponseFromSecondServer {
string computedInformation = 1;
}
First server knows about first proto file but doesn't know about second.
Second server knows about second proto file but doesn't know about first.
Middle server knows about first and second proto.
Need to write a server that will transmit requests from one server from one server to another
I started writing it on Java. But faced the problem of sending to much requests to second server
That how my service middle implementation looks on Java:
package middle.server.pack;
import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;
import java.util.logging.LogManager;
import java.util.logging.Logger;
public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());
@Override
public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
return new StreamObserver<First.RequestToFirstServer>() {
@Override
public void onNext(First.RequestToFirstServer value) {
SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
new StreamObserver<Second.ResponseFromSecondServer>() {
@Override
public void onNext(Second.ResponseFromSecondServer value) {
doProcessOnResponse(value);
First.ResponseForFirstServer responseForFirstServer =
mapToFirstResponse(value);
responseObserver.onNext(responseForFirstServer);
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("sucess");
}
}
);
Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
requestObserver.onNext(requestToSecondServer);
requestObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("Everything okay");
}
};
}
}
After a request from the first client on the side of the middle server, I get the following errors:
CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error
I know that I am doing it wrong. So the question is how to make it right or if I can't make it on java could I make it in any other language?
There are several problems here:
requestObserver.onCompleted()
should be moved to onCompleted()
of StreamObserver<First.RequestToFirstServer>
few lines below (next to logger.info("Everything okay");
).responseObserver.onCompleted()
. You should do so in onCompleted()
of StreamObserver<Second.ResponseFromSecondServer>
(next to logger.info("sucess");
).onError(...)
methods should be analogical to onCompleted()
)So generally speaking (and as @SergiiTkachenko also pointed), receiving a given callback (onNext
, onError
, onCompleted
, "onCancel
") from one of the servers should trigger issuing the corresponding call to the other server (after "translating" the argument where needed).
Finally, you should respect both servers' readiness using methods from CallStreamObserver: disableAutoInboundFlowControl(), request(1), isReady() and setOnReadyHandler(...). You can always cast your outbound StreamObserver
s to CallStreamObserver
s. More specifically, you should cast responseObserver
to ServerCallStreamObserver and requestObserver
to ClientCallStreamObserver.
This should be implemented criss-cross:
request(1)
message from a given server at the end of processing a message from it (in onNext()
) if the other server isReady
.onReady
" callback from one server should trigger request(...)
-ing 1 message from the other server.As far as I remember, after returning the observer from the outer method you should receive initial "onReady
" callbacks from the both servers, which will put everything in motion. However I'm not 100% sure about the callback from the second server and cannot verify it at the moment: in case you don't receive the initial callback from it, simply request 1 initial message from the first server before returning the observer.