Search code examples
javaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream pollable consumer dlq and errorChannel don't work if a different thread is being used


In order to manage a long-running task with Spring Cloud Stream 3.1.1 with Kafka binder, we need to use a Pollable Consumer to manage the consumption manually in a separate thread so Kafka does not trigger the rebalance. To do that, we have defined a new annotation to manage Pollable Consumer. The issue with this approach is because the work needs to be managed in a separate thread any exception that is thrown won't end up in errorChannel and DLQ eventually.

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableConsumer) && args(dataCapsule,..)")
  public void handleMessage(ProceedingJoinPoint joinPoint,
      PollableConsumer pollableConsumer, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableConsumerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {  

        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

We can create a different output channel to publish the message in case of an exception but it feels we are trying to implement something that may not be necessary.

Update 1

We’ve added these beans:

  @Bean
  public KafkaTemplate<String, byte[]> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
  @Bean
  public ProducerFactory<String, byte[]> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        "http://localhost:9092");
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        KafkaAvroSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }
  @Bean
  public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
    return new KafkaAdmin(configs);
  }
  @Bean
  public NewTopic topicErr() {
    return TopicBuilder.name("ERR").partitions(1).replicas(1).build();
  }
  @Bean
  public SeekToCurrentErrorHandler eh(KafkaOperations<String, byte[]> template) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
        template,
        (cr, e) -> new TopicPartition("ERR", 1)),
        new FixedBackOff(0L, 1L));
  }

And enable-dlq is not set in spring.cloud.stream.kafka.bindings.channel-name.consumer But still we can’t see any messages being produced to the ERR topic. Even for any exceptions thrown by the main thread.

If enable-dlq is set to true, exceptions on the main thread will be published into the default dlq topic, and as expected, the ones on the child thread get ignored.

Update 2

The example Gary seems to be working in general. Although we needed to do some modification as we use the deprecated StreamListner approach instead of Functions, there are a few issues that we could not sort out with our case.

  • The topic name seems to be expected to be always channel_name+.DLTas we could not figure out how a different name like dlq could be used. We are using a single dlq topic for all consumers which does not seem to be how the Spring-kafka default DLT expects to have.
  • It seems we need to have at least the same number of partitions on the DLT as the consumer topic. Otherwise, this solution does not work. Not sure how this can be managed though as does not seem to be a practical assumption for us.
  • Is there a way where we could leverage Spring retry similar to what Spring Cloud Stream does behind the scene? Or this needs to be implemented separately? i.e. retrying the work based on max.attempts and then kick in the DLQ part.
  • I could see that in the example Spring actuator has been used to update the channel status via this.endpoint.changeState("polled", State.PAUSED) and this.endpoint.changeState("polled", State.RESUMED). Why do we need to do that in conjunction with pause, requeue, etc. What is the side effect of not doing that?

Solution

  • Your observation is correct; the error handling is bound to the thread.

    You could use a DeadLetterPublishingRecoverer directly in your code to make publishing the DLQ a litter easier (instead of an output channel). That way, you'll get the enhanced headers with exception information etc.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

    EDIT

    Here is an example; I am pausing the binding to prevent any new deliveries while the "job" is being run rather than requeuing the delivery, as you are doing.

    @SpringBootApplication
    @EnableScheduling
    public class So67296258Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67296258Application.class, args);
        }
    
        @Bean
        TaskExecutor exec() {
            return new ThreadPoolTaskExecutor();
        }
    
        @Bean
        DeadLetterPublishingRecoverer recoverer(KafkaOperations<Object, Object> template) {
            return new DeadLetterPublishingRecoverer(template);
        }
    
        @Bean
        NewTopic topic() {
            return TopicBuilder.name("polled.DLT").partitions(1).replicas(1).build();
        }
    
        @Bean
        MessageSourceCustomizer<KafkaMessageSource<?, ?>> customizer() {
            return (source, dest, group) -> source.setRawMessageHeader(true);
        }
    
    }
    
    @Component
    class Handler {
    
        private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
    
        private final PollableMessageSource source;
    
        private final TaskExecutor exec;
    
        private final BindingsEndpoint endpoint;
    
        private final DeadLetterPublishingRecoverer recoverer;
    
        Handler(PollableMessageSource source, TaskExecutor exec, BindingsEndpoint endpoint,
                DeadLetterPublishingRecoverer recoverer) {
    
            this.source = source;
            this.exec = exec;
            this.endpoint = endpoint;
            this.recoverer = recoverer;
        }
    
        @Scheduled(fixedDelay = 5_000)
        public void process() {
            LOG.info("Polling");
            boolean polled = this.source.poll(msg -> {
                LOG.info("Pausing Binding");
                this.endpoint.changeState("polled", State.PAUSED);
                AcknowledgmentCallback callback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(msg);
                callback.noAutoAck();
    //          LOG.info(msg.toString());
                this.exec.execute(() -> {
                    try {
                        runJob(msg);
                    }
                    catch (Exception e) {
                        this.recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), e);
                    }
                    finally {
                        callback.acknowledge();
                        this.endpoint.changeState("polled", State.RESUMED);
                        LOG.info("Resumed Binding");
                    }
                });
            });
            LOG.info("" + polled);
        }
    
        private void runJob(Message<?> msg) throws InterruptedException {
            LOG.info("Running job");
            Thread.sleep(30_000);
            throw new RuntimeException("fail");
        }
    
    }
    
    spring.cloud.stream.pollable-source=polled
    spring.cloud.stream.bindings.polled-in-0.destination=polled
    spring.cloud.stream.bindings.polled-in-0.group=polled
    

    EDIT2

    Answers to the supplemental questions:

    1, 2: See the Spring for Apache Kafka documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

    The DLPR has an alternate constructor enabling you to specify a destination resolver. The default just appends .DLT and uses the same partition. The javadocs specify how the destination partition can be specified:

        /**
         * Create an instance with the provided template and destination resolving function,
         * that receives the failed consumer record and the exception and returns a
         * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
         * 0, no partition is set when publishing to the topic.
         * @param template the {@link KafkaOperations} to use for publishing.
         * @param destinationResolver the resolving function.
         */
    

    When null, the KafkaProducer selects the partition.

    1. Wire up a RetryTemplate with appropriate retry and backoff policies; then
    retryTemplate.execute(context -> { ... },
        context -> {...});
    

    The second argument is a RecoveryCallback, called when retries are exhausted.

    1. It's more efficient. With your solution, you keep retrieving and requeuing the delivery while you are processing the previous task. By pausing the binding, we are telling kafka to not send any more records when we poll() until we resume the consumer. This allows us to keep the consumer alive by polling it, but without the overhead of retrieving and resetting the offset.