Search code examples
grpckafka-consumer-apiapache-kafka-streamsgrpc-java

Open and close channel in the gRPC client with every request


I have a gRPC client in a kafka application. This means the client will constantly open and close channels.

public class UserAgentClient {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private static final Config uaparserConfig = ConfigFactory.load().getConfig(ua);
    private final ManagedChannel channel;
    private final UserAgentServiceGrpc.UserAgentServiceBlockingStub userAgentBlockingStub;

    public UserAgentParserClient() {
        this(ManagedChannelBuilder.forAddress(uaConfig.getString("host"), uaConfig.getInt("port")).usePlaintext());
    }

    public UserAgentClient(ManagedChannelBuilder<?> usePlaintext) {
        channel = usePlaintext.build();
        userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
    }

    public UserAgentParseResponse getUserAgent(String userAgent ) {
        UserAgentRequest request = UserAgentRequest.newBuilder().setUserAgent(userAgent).build();
        UserAgentParseResponse response = null;
        try {
            response = userAgentBlockingStub.parseUserAgent(request);
        } catch(Exception e) {
            logger.warn("An exception has occurred during gRPC call to the user agent.", e.getMessage());
        }
        shutdown();
        return response;
    }

    public void shutdown() {
        try {
            channel.shutdown();
        } catch (InterruptedException ie) {
            logger.warn("Interrupted exception during gRPC channel close", ie);
        }
    }
}

I was wondering if I can keep the channel open the whole time? Or do I have to open a channel every time I make a new call? I was wondering because I was testing the performance and it seems to improve drastically if I just keep the channel open. On the other hand is there something that I'm missing?


Solution

  • Since the opening and closing of channel is expensive I removed the channel = usePlaintext.build(); completely from my client Instead I'm opening and closing it in my kafka Transformer. In my class UserAgentDataEnricher that implements Transformer.

    public class UserAgentDataEnricher implements Transformer<byte[], EnrichedData, KeyValue<byte[], EnrichedData>> {
    
        private UserAgentParserClient userAgentParserClient;
    
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            open();
            // schedule a punctuate() method every 15 minutes
            this.context.schedule(900000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
                close();
                open();
                logger.info("Re-opening of user agent channel is initialized");
            });
        }
    
        @Override
        public void close() {
            userAgentParserClient.shutdown();
        }
    
        private void open() {
            channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
            userAgentClient = new UserAgentClient(channel);
        }
    
        ...
    }
    

    and now I initialize my client like that:

    public UserAgentClient(ManagedChannel channel) {
        this.channel = channel;
        userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
    }