Search code examples
javaapache-kafkaprotocol-buffersgrpcquarkus

Failed to inject Kafka client into GrpcService Quarkus


I'm trying to receive message through Grpc service, send it to Kafka Emitter, and return some value back.

@Singleton
@GrpcService
public class MessageService implements protobuf.MessageService{

    @Inject
    @Channel("hello-out")
    Emitter<Record<String, GeneratedMessageV3>> emitter;

    @Override
    public Uni<EnvelopeReply> processMessage(Envelope request) {
        return Uni.createFrom().completionStage(
                emitter.send(Record.of(request.getKey(), request))
        ).replaceWith(EnvelopeReply.newBuilder().build());
    }
}

During build, I'm getting next error:

 Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<io.smallrye.reactive.messaging.kafka.Record<java.lang.String, com.google.protobuf.GeneratedMessageV3>> com.test.MessageService.emitter
...
Caused by: javax.enterprise.inject.spi.DefinitionException: SRMSG00019: Unable to connect an emitter with the channel `hello-out`

It works properly with Rest resource.


Solution

  • Without going deeply into the topic, here's my solution: You can't inject Kafka Emmiter directly to grpc service, it'll throw an exception.

    GrpcService <- Emitter<Record...>
    

    Possible reason(I'm sure Quarkus team will reply lower with correct solution :)) is that all GrpcServices are of @Singleton type, and they can't have lazy-initialised properties, they need to have something directly injected. Emitter is generated at a later stage. By adding a wrapper class you're solving all the headaches, so:

    GrpcService <- KafkaService <- Emitter<Record...>
    
    @ApplicationScoped
    public class KafkaService {
    
        @Inject
        @Channel("hello-out")
        Emitter<Record<String, GeneratedMessageV3>> emitter;
        // Implement this part properly, added just for example
        public Emitter<Record<String, GeneratedMessageV3>> getEmitter() {
            return emitter;
        }
    
    }
    ...
    @Singleton
    @GrpcService
    public class MessageService implements protobuf.MessageService {
    
        @Inject
        KafkaService kafkaService;
    
        @Override
        public Uni<EnvelopeReply> processMessage(Envelope request) {
            // use metadata if needed
            Map<String, String> metadataMap = request.getMetadataMap();
            return Uni.createFrom().completionStage(
                    kafkaService.getEmitter().send(Record.of(request.getKey(), request))
            ).replaceWith(EnvelopeReply.newBuilder().build());
        }
    }