Search code examples
springspring-bootapache-kafkaspring-kafkakafka-consumer-api

Kafka Consumer Unable To Resolve Listener Method Intermittently


I have been facing the exception below on the Kafka consumer side. Surprisingly, this issue is not consistent and an older version of the code (with the exact same configuration but some new unrelated features) works as expected. Could anyone help in determining what could be causing this?

[ERROR][938f3c68-f481-4224-b2c6-43af5fb27ada-0-C-1][org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:77) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2010) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1854) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1720) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1699) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.1.jar:2.7.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2367) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2003) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
    ... 8 more
Caused by: org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:145) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1983) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
    ... 8 more

My app uses the following:

  1. A custom listener class com.mycompany.listener.KafkaBatchListener<K, V> which implements org.springframework.kafka.listener.BatchAcknowledgingMessageListener<K, V> and overrides onMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment) with a custom marker annotation @MyKafkaListener
  2. A custom container factory which extends org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V> and configures setConsumerFactory(consumerFactory), setBatchErrorHandler(errorHandler), setBatchListener(true) and ContainerProperties.setOnlyLogRecordMetadata(true).
  3. A SpringBoot @Configuration class which implements org.springframework.kafka.annotation.KafkaListenerConfigurer and is responsible for configuring org.springframework.kafka.core.DefaultKafkaConsumerFactory<K, V>, org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V> and org.springframework.kafka.config.MethodKafkaListenerEndpoint<String, String> (used by @MyKafkaListener)
  4. Spring Kafka 2.7.1

Additional query: Even though ContainerProperties.setOnlyLogRecordMetadata(true) is set, the exception stacktrace still contains the full payload which I have omitted. Any idea why?

Thanks in advance!


UPDATE:

  1. KafkaBatchListener
package com.mycompany.listener;

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

public class KafkaBatchListener<K, V> implements BatchAcknowledgingMessageListener<K, V> {

    @Override
    @com.mycompany.listener.KafkaListener
    public void onMessage(final List<ConsumerRecord<K, V>> consumerRecords, final Acknowledgment acknowledgment) {

        // process batch using MyService<K, V>.process(consumerRecords)
        acknowledgment.acknowledge();
    }
}

  1. Custom Annotation
package com.mycompany.listener;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

@Retention(RUNTIME)
@Target(METHOD)
public @interface KafkaListener {

}
  1. Listener Container Factory
package com.mycompany.factory;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;

public class KafkaBatchListenerContainerFactory<K, V>
        extends ConcurrentKafkaListenerContainerFactory<K, V> {

    public KafkaBatchListenerContainerFactory(final DefaultKafkaConsumerFactory<K, V> consumerFactory,
            final ListenerContainerRecoveringBatchErrorHandler errorHandler, final int concurrency) {

        super.setConsumerFactory(consumerFactory);
        super.setBatchErrorHandler(errorHandler);
        super.setConcurrency(concurrency);
        super.setBatchListener(true);
        super.setAutoStartup(true);

        final ContainerProperties containerProperties = super.getContainerProperties();
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
        containerProperties.setOnlyLogRecordMetadata(true);
    }

}
  1. Batch Error Handler
package com.mycompany.errorhandler;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.RecoveringBatchErrorHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

@Component
public class ListenerContainerRecoveringBatchErrorHandler extends RecoveringBatchErrorHandler {

    public ListenerContainerRecoveringBatchErrorHandler(
            @Value("${spring.kafka.consumer.properties.backOffMS:0}") final int backOffTimeMS,
            @Value("${spring.kafka.consumer.properties.retries:3}") final int retries) {

        super(new FixedBackOff(backOffTimeMS, retries));
    }

}
  1. Kafka Listener Configurer
package com.mycompany.config;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
import com.mycompany.factory.KafkaBatchListenerContainerFactory;
import com.mycompany.listener.KafkaBatchListener;

@Configuration
public class KafkaBatchListenerConfigurer<K, V> implements KafkaListenerConfigurer {

    private final List<KafkaBatchListener<K, V>> listeners;
    private final BeanFactory beanFactory;
    private final ListenerContainerRecoveringBatchErrorHandler errorHandler;
    private final int concurrency;

    @Autowired
    public KafkaBatchListenerConfigurer(final List<KafkaBatchListener<K, V>> listeners, final BeanFactory beanFactory,
            final ListenerContainerRecoveringBatchErrorHandler errorHandler,
            @Value("${spring.kafka.listener.concurrency:1}") final int concurrency) {
        this.listeners = listeners;
        this.beanFactory = beanFactory;
        this.errorHandler = errorHandler;
        this.concurrency = concurrency;
    }

    @Override
    public void configureKafkaListeners(final KafkaListenerEndpointRegistrar registrar) {

        final Method listenerMethod = lookUpBatchListenerMethod();

        listeners.forEach(listener -> {
            registerListenerEndpoint(listener, listenerMethod, registrar);
        });
    }

    private void registerListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod,
            final KafkaListenerEndpointRegistrar registrar) {

        // final Map<String, Object> consumerConfig = get ConsumerConfig from a custom provider;
        registrar.setContainerFactory(createContainerFactory(consumerConfig));
        registrar.registerEndpoint(createListenerEndpoint(listener, listenerMethod, consumerConfig));
    }

    private KafkaBatchListenerContainerFactory<K, V> createContainerFactory(final Map<String, Object> consumerConfig) {

        final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);

        final KafkaBatchListenerContainerFactory<K, V> containerFactory = new KafkaBatchListenerContainerFactory<>(
                consumerFactory, errorHandler, concurrency);

        return containerFactory;
    }

    private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(final KafkaBatchListener<K, V> listener,
            final Method listenerMethod, final Map<String, Object> consumerConfig) {

        final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setId(UUID.randomUUID().toString());
        endpoint.setBean(listener);
        endpoint.setMethod(listenerMethod);
        endpoint.setBeanFactory(beanFactory);
        endpoint.setGroupId("my-group-id");
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());

        // final String topicName = get TopicName for this key-value from a custom utility;
        endpoint.setTopics(topicName);

        final Properties properties = new Properties();
        properties.putAll(consumerConfig);
        endpoint.setConsumerProperties(properties);

        return endpoint;
    }

    private Method lookUpBatchListenerMethod() {
        return Arrays.stream(com.mycompany.listener.KafkaBatchListener.class.getMethods())
                .filter(m -> m.isAnnotationPresent(com.mycompany.listener.KafkaListener.class))
                .findAny()
                .orElseThrow(() -> new IllegalStateException(
                        String.format("[%s] class should have at least 1 method with [%s] annotation.",
                                com.mycompany.listener.KafkaBatchListener.class.getCanonicalName(),
                                com.mycompany.listener.KafkaListener.class.getCanonicalName())));
    }

}


Solution

  • You don't need all the standard @KafkaListener method invoking infrastructure when your listener already implements one of the message listener interfaces; instead of registering endpoints for each listener, just create a container for each from the factory and add the listener to the container properties.

    val container = containerFactory.createContainer("topic1");
    container.getContainerProperties().set...
    ...
    container.getContainerProperies().setMessageListener(myListenerInstance);
    ...
    container.start();