Search code examples
kafka-consumer-apispring-kafkaproject-reactorfluxreactor-kafka

RetryBackoffSpec not working with KafkaReceiver which throws exception


I have a use case where I want to infinitely keep receiving records from Kafka and do some processing on the record using processRecord(String record) which can throw a RuntimeException. I want to retry multiple times (say 5) and if it is successful anytime before 5 retries want to commit the offset manually and continue with next records and if it is not then want to (log it --> commit offset) then continue with the next records. I have a code, but doesn't seem to work appropriately. Would appreciate some help.

public class MyClass {
    private final AtomicInteger atomicInteger = new AtomicInteger(0);
    private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();

    public void consumeRecords() {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext(record -> {
                    System.out.println(record.value());
                    processRecord(record.value());
                })
                .doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
                .onErrorContinue((e, r) -> {
                    System.out.println(atomicInteger.incrementAndGet());
                    System.out.println("Record: " + r);
                    System.out.println("Error: " + e);
                })
                .retryWhen(retrySpec)
                .repeat()
                .subscribe();

    }

    public void processRecord(String record) {
        // might throw an exception
        throw new RuntimeException("Throwing exception!");
    }
}

The output that I receive is :

some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!

second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!

It is not retrying 5 times and moreover the AtomicInteger is not getting updated for the second record.

What I want to achieve is :

count = 0
while (count < 5) {
    if (exception) count++;
    else break_and_continue_with_next_record
}

if (count == 5) log_failure_and_continue_with_next_record

Solution

  • onErrorResume() is preferred over onErrorContinue().

    The problem then is you can't commit the offset there, because the receiver is no longer active at that point.

    This works for me...

        private final AtomicInteger atomicInteger = new AtomicInteger();
    
        public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
            RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
            KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
            AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
            receiver.receive()
                    .subscribeOn(Schedulers.single())
                    .doOnNext(record -> {
                        System.out.println(record.value() + "@" + record.offset());
                        if (failed.get() != null) {
                            System.out.println("Committing failed record offset " + record.value()
                                    + "@" + record.offset());
                            record.receiverOffset().acknowledge();
                            failed.set(null);
                        }
                        else {
                            atomicInteger.set(0);
                            try {
                                processRecord(record.value());
                                record.receiverOffset().acknowledge();
                            }
                            catch (Exception e) {
                                throw new ReceiverRecordException(record, e);
                            }
                        }
                    })
                    .doOnError(ex -> atomicInteger.incrementAndGet())
                    .retryWhen(retrySpec)
                    .onErrorResume(e -> {
                        ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                        ReceiverRecord<?, ?> record = ex.getRecord();
                        System.out.println("Retries exhausted for " + record.value()
                                + "@" + record.offset());
                        failed.set(record);
                        return Mono.empty();
                    })
                    .repeat()
                    .subscribe();
        }
    
        public void processRecord(String record) {
            // might throw an exception
            throw new RuntimeException("Throwing exception!");
        }
    
    }
    
    @SuppressWarnings("serial")
    class ReceiverRecordException extends RuntimeException {
    
        private final ReceiverRecord record;
    
        ReceiverRecordException(ReceiverRecord record, Throwable t) {
            super(t);
            this.record = record;
        }
    
        public ReceiverRecord getRecord() {
            return this.record;
        }
    
    }
    

    EDIT

    Here is the complete app...

    @SpringBootApplication
    public class So67373188Application {
    
        private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);
    
        public static void main(String[] args) throws InterruptedException {
            SpringApplication.run(So67373188Application.class, args);
            Thread.sleep(120_000);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner2() {
            return args -> {
                SenderOptions<String, String> so = SenderOptions.create(
                        Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
                KafkaSender<String, String> sender = KafkaSender.create(so);
                Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
                    .subscribe(result -> {
                        System.out.println(result.recordMetadata());
                    });
                Thread.sleep(5000);
                subscribed.dispose();
            };
        }
    
        @Bean
        public ApplicationRunner runner3(KafkaOperations<String, String> template) {
            return args -> {
                DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
                ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                        Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                                ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
                                ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                        .withKeyDeserializer(new StringDeserializer())
                        .withValueDeserializer(new StringDeserializer())
                        .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                        .commitBatchSize(1)
                        .subscription(Collections.singletonList("so67373188"));
                consumeRecords(ro);
            };
        }
    
        private SenderRecord<String, String, String> pr(String value) {
            return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
        }
    
        private final AtomicInteger atomicInteger = new AtomicInteger();
    
        public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
            RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
            KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
            AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
            receiver.receive()
                    .subscribeOn(Schedulers.single())
                    .doOnNext(record -> {
                        System.out.println(record.value() + "@" + record.offset());
                        if (failed.get() != null) {
                            System.out.println("Committing failed record offset " + record.value()
                                    + "@" + record.offset());
                            record.receiverOffset().acknowledge();
                            failed.set(null);
                        }
                        else {
                            atomicInteger.set(0);
                            try {
                                processRecord(record.value());
                                record.receiverOffset().acknowledge();
                            }
                            catch (Exception e) {
                                throw new ReceiverRecordException(record, e);
                            }
                        }
                    })
                    .doOnError(ex -> atomicInteger.incrementAndGet())
                    .retryWhen(retrySpec)
                    .onErrorResume(e -> {
                        ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                        ReceiverRecord<?, ?> record = ex.getRecord();
                        System.out.println("Retries exhausted for " + record.value()
                                + "@" + record.offset());
                        failed.set(record);
                        return Mono.empty();
                    })
                    .repeat()
                    .subscribe();
        }
    
        public void processRecord(String record) {
            // might throw an exception
            if (record.equals("fail")) {
                throw new RuntimeException("Throwing exception!");
            }
        }
    
    }
    
    @SuppressWarnings("serial")
    class ReceiverRecordException extends RuntimeException {
    
        private final ReceiverRecord record;
    
        ReceiverRecordException(ReceiverRecord record, Throwable t) {
            super(t);
            this.record = record;
        }
    
        public ReceiverRecord getRecord() {
            return this.record;
        }
    
    }
    

    Result:

    
      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
    [32m :: Spring Boot :: [39m              [2m (v2.4.5)[0;39m
    
    so67373188-0@16
    so67373188-0@17
    so67373188-0@18
    so67373188-0@19
    foo@16
    bar@17
    fail@18
    fail@18
    fail@18
    fail@18
    fail@18
    fail@18
    Retries exhausted for fail@18
    fail@18
    Committing failed record offset fail@18
    baz@19