Search code examples
apache-kafkaspring-kafkakafka-consumer-api

How to wrap @KafkaListener for custom method arguments?


I have a use case in which I want to pass extra method arguments to my kafka consumer method.

Something like :

@KafkaListener(
        id = "TestService",
        topics = "testTpoc",
        groupId = "kafkaGroupId1",
        autoStartup = "true")
    public void receiveTestEventWithContext(final CustomContext customContext, final ConsumerRecord<String, String> consumerRecord) {
        log.info("Recieved kafka message with custom context");
    }

Is there any way that I can pass CustomContext to my consumer method? I have a use case in which I want to pass this context to some downstream API calls.


Solution

  • The @KafkaListener method is called from a thread which loops over KafkaConsumer.poll(). All the arguments for that method are really fed from that scope. Since there is nothing Kafka-related in your CustomContext, then there is no way to pass it from the listener container. And technically it must not since this custom object just does not make sense for Kafka consumer scope.

    Well, you can do that with custom HandlerMethodArgumentResolver. See KafkaListenerConfigurer to implement. And then KafkaListenerEndpointRegistrar.setCustomMethodArgumentResolvers(HandlerMethodArgumentResolver... methodArgumentResolvers). But is this exactly what you are looking for? Cannot you just create that CustomContext in this receiveTestEventWithContext() method and pass it down as you wish when Kafka record is received and handled by this method?