Search code examples
javajava-streamgrpcgrpc-javajava-server

Redirecting service to bidirectional grpc


I'm currently working on a grpc server that will receive streaming grpc calls from the first server and redirect these calls to the second server, and redirect responses from the second server as streams to the first one.

I have 2 proto files first proto

First file:

syntax = "proto3";

package first.proto.pack;

service FirstProtoService {
  rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}

message RequestToFirstServer {
    oneof firstStreamingRequest {
        int32 x = 1;
        int32 y = 2;
    }
}

message ResponseForFirstServer {
  string someprocessedinformation = 1;
}

Second file:

syntax = "proto3";

package second.proto.pack;

service SecondProtoService {
  rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}

message RequestToSecondServer {
  oneof secondStreamingRequest {
    int32 processedX = 1;
    int32 procesdedY = 2;
  }
}

message ResponseFromSecondServer {
  string computedInformation = 1;
}

First server knows about first proto file but doesn't know about second.

Second server knows about second proto file but doesn't know about first.

Middle server knows about first and second proto.

Need to write a server that will transmit requests from one server from one server to another

I started writing it on Java. But faced the problem of sending to much requests to second server

That how my service middle implementation looks on Java:

package middle.server.pack;


import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;

import java.util.logging.LogManager;
import java.util.logging.Logger;

public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
    private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
    private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());

    @Override
    public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
        return new StreamObserver<First.RequestToFirstServer>() {
            @Override
            public void onNext(First.RequestToFirstServer value) {
                SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
                StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
                        new StreamObserver<Second.ResponseFromSecondServer>() {
                            @Override
                            public void onNext(Second.ResponseFromSecondServer value) {
                                doProcessOnResponse(value);
                                First.ResponseForFirstServer responseForFirstServer =
                                        mapToFirstResponse(value);
                                responseObserver.onNext(responseForFirstServer);
                            }

                            @Override
                            public void onError(Throwable t) {
                                logger.info(t.getMessage());
                            }

                            @Override
                            public void onCompleted() {
                                logger.info("sucess");
                            }
                        }
                );
                Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
                requestObserver.onNext(requestToSecondServer);
                requestObserver.onCompleted();
            }

            @Override
            public void onError(Throwable t) {
                logger.info(t.getMessage());
            }

            @Override
            public void onCompleted() {
                logger.info("Everything okay");
            }
        };
    }
}

After a request from the first client on the side of the middle server, I get the following errors:

CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error

I know that I am doing it wrong. So the question is how to make it right or if I can't make it on java could I make it in any other language?


Solution

  • There are several problems here:

    • as pointed by @SergiiTkachenko, you create a new RPC to the second server per each message from the first. To solve this, Move the call to the second server 3 lines up to the beginning of the outer method.
    • a call to requestObserver.onCompleted() should be moved to onCompleted() of StreamObserver<First.RequestToFirstServer> few lines below (next to logger.info("Everything okay");).
    • you never call responseObserver.onCompleted(). You should do so in onCompleted() of StreamObserver<Second.ResponseFromSecondServer> (next to logger.info("sucess");).
    • you should signal errors received from one server to the other (relation between onError(...) methods should be analogical to onCompleted())
    • you should handle cancellations by the first server using setOnCancelHandler(...) and propagate them to the second.

    So generally speaking (and as @SergiiTkachenko also pointed), receiving a given callback (onNext, onError, onCompleted, "onCancel") from one of the servers should trigger issuing the corresponding call to the other server (after "translating" the argument where needed).

    Finally, you should respect both servers' readiness using methods from CallStreamObserver: disableAutoInboundFlowControl(), request(1), isReady() and setOnReadyHandler(...). You can always cast your outbound StreamObservers to CallStreamObservers. More specifically, you should cast responseObserver to ServerCallStreamObserver and requestObserver to ClientCallStreamObserver.
    This should be implemented criss-cross:

    • You should request(1) message from a given server at the end of processing a message from it (in onNext()) if the other server isReady.
    • receiving "onReady" callback from one server should trigger request(...)-ing 1 message from the other server.

    As far as I remember, after returning the observer from the outer method you should receive initial "onReady" callbacks from the both servers, which will put everything in motion. However I'm not 100% sure about the callback from the second server and cannot verify it at the moment: in case you don't receive the initial callback from it, simply request 1 initial message from the first server before returning the observer.