Search code examples
springspring-bootspring-kafka

sprign kafka migration from 2.x to 3.0.1


Hello i'm migrating from spring-boot 2 to 3 and relating to that spring-kafka from 2.x to spring-kafka 3.0.1, there were some changes and i'm struggling with one class to migrate. I was trying with DefaultErrorHandler but there is no such method as handle. I was using the similar public method handleRemaining, it's ok? Then i was trying to use SeekUtils but there is no method getSkipPredicate or getRecoverStrategy that i should put as constructor paramether. How can I migrate this class?

@Component
@Slf4j
public class DeserializationFailedErrorHandler extends SeekToCurrentErrorHandler {   

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        SeekUtils.seekOrRecover(thrownException, records, consumer, container, this.isCommitRecovered(), this.getSkipPredicate(records, thrownException), this.logger, this.getLogLevel());
        // do somethings
}

Solution

  • handleRemaining() is the equivalent of handle() in the old error handlers, but it is only called if the error handler returns true when seeksAfterHandling() is called by the container.

    See how the DefaultErrorHandler calls SeekUtils.seekOrRecover() when it is configured that way.

    However it's not clear why you need a custom error handler; deserialization exceptions can be handled by default error handler.