Search code examples
apache-kafkaspring-cloudspring-kafkaopentracing

UnsupportedOperationException when using RetryableTopic


I have an issue with using the RetryableTopic together with the opentracing springboot functionality. The RetryableTopic definition looks like this:

   @RetryableTopic(
      attempts = "3",
      backoff = @Backoff(delay = 3000, multiplier = 2.0),
      topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE,
      kafkaTemplate = "dltKafkaTemplate",
      listenerContainerFactory = "retryEventListenerFactory",
      exclude = {
          DeserializationException.class,
          SerializationException.class,
          MessageConversionException.class,
          ConversionException.class,
          MethodArgumentResolutionException.class,
          NoSuchMethodException.class,
          ClassCastException.class
      }
  )
  @KafkaListener(
      topics = "dlt-msg-test-topic",
      containerFactory = "retryEventListenerFactory")
  public void consume(
      String message, @Headers MessageHeaders messageHeaders) {
    LOGGER.info("Received {}", message);
    throw new RuntimeException("Test retry exception");
  }

  @DltHandler
  public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    LOGGER.info(in + " from " + topic);
  }

This sample works fine by itself, triggers the retries and dlt. As soon as I try to add the tracing functionality to the project by introducing opentracing-spring-cloud-starter for example and related dependencies then the following error is thrown:

java.lang.UnsupportedOperationException: This implementation doesn't support this method
    at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:120)
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.determineSendTimeout(DeadLetterPublishingRecoverer.java:661)
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:636)
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:628)
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:524)
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:489)
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:461)
    at org.springframework.kafka.listener.FailedRecordProcessor.getRecoveryStrategy(FailedRecordProcessor.java:181)
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:134)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2674)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2555)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2429)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2307)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1981)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1365)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1356)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251)

After debugging, I have found that the opentracing creates a wrapper around the original consumer that is called TracingKafkaConsumer. This TracingKafkaConsumer stores the original consumer as one of it fields. So when the DefaultErrorHandler is called as part of retry cycle instead of original consumer the TracingKafkaConsumer is passed into the handleRemaining method of DefaultErrorHandler.

If i understand correctly, after that the SeekUtils.seekOrRecover fails with exception because it cannot detect the necessary parameters from the original consumer like it would when running without the tracing functionality. My guess is that those parameters cannot be found because the original consumer is now a field within the TracingKafkaConsumer and hence that seek functionality that fails doesn't know how to get the things it needs from the original consumer.

My question is, how can I get out of this situation and have both TracingKafkaConsumer and RetryableTopic functionality working correctly together?


Solution

  • It looks like open tracing wraps the ProducerFactory in another factory (that, presumably, wraps the producers in the tracing producers).

    java.lang.UnsupportedOperationException: This implementation doesn't support this method
        at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:120)
    

    The producer factory wrapper should implement getConfigurationProperties() and call the delegate factory to get its properties.

    I suggest you open a bug against open tracing to fix their wrapper so that it implements all the necessary methods by delegating to the real factory.

    That said, all we are trying to do here is determine the send timeout setting on the producer.

    Please open an issue here: https://github.com/spring-projects/spring-kafka/issues we can be a bit more tolerant if the producer factory doesn't honor the contract, and fall back to some default timeout.

    EDIT

    It looks like they already fixed it...

    https://github.com/opentracing-contrib/java-kafka-client/blob/31ce5260279c2cadf3d69c0acbd50f024afe4660/opentracing-kafka-spring/src/main/java/io/opentracing/contrib/kafka/spring/TracingProducerFactory.java#L121-L124

    They fixed it a couple of years ago; you must be using an old version

    https://github.com/opentracing-contrib/java-kafka-client/pull/87

    https://github.com/opentracing-contrib/java-kafka-client/commit/31ce5260279c2cadf3d69c0acbd50f024afe4660

    EDIT2

    Well, it was fixed in 2021, but it looks like there hasn't been a release since 2020.

    Maybe that project is dead?