Search code examples
spring-bootapache-kafkaspring-kafka

Listener method could not be invoked with the incoming message and Backoff none exhausted for ConsumerRecord


Below is My method defination for kafka listener and if receive null or empty string for payload i guess I'm getting below error... Can you please help.

 @KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                               @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}

[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method[public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String)] Bean[com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer @731702d1]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, OPERATION = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "orgId": "1", "orderId": "U4000024004" }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }];nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, OPERATION = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "orgId": "1", "orderId": "U4000024004" }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java: 1925) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java: 1913) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1812) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java: 1739) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java: 1636) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java: 1366) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java: 1082) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java: 990) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) at java.util.concurrent.FutureTask.run(FutureTask.java: 266) at java.lang.Thread.run(Thread.java: 748) Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java: 122) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java: 901) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java: 117) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java: 148) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java: 116) at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java: 48) at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java: 329) at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java: 86) at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java: 51) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java: 1880) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java: 1862) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1799) ...8 common frames omitt


Solution

  • You need to specify that the payload is not required.

    @Payload(required = false) String payload, ...