I coded an error-prone consumer which throws exceptions randomly at a rate of 50%.
This consumer consumes all messages but writes only 85% of them to the database.
Where is the problem?
github: https://github.com/grzegorz-brzeczyszczykiewicz/kafka-losing-messages
I configured auto commit like this
spring.cloud.stream.kafka.bindings.my-channel.consumer.autoCommitOffset=false
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.isolation-level=read_commited
my consumer looks like
package com.example.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.time.ZonedDateTime;
import java.util.Random;
@Configuration
@Component
@EnableTransactionManagement
@Slf4j
public class Consumer {
@Autowired
private MyDataRepository repository;
private int methodCounter = 0;
private int defectCounter = 0;
//@Transactional
@StreamListener(Stream.IN_CHANNEL)
public void handleMessage(final Message<String> msg) throws Exception {
//msg.getHeaders().forEach((key, value) -> System.out.println(key + ": " + value));
final Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
System.out.println("payload: " + msg.getPayload());
methodCounter++;
System.out.println("method counter: " + methodCounter);
if(!new Random().nextBoolean()){
defectCounter++;
System.out.println("defect counter: " + defectCounter);
throw new Exception("kaputt");
}
System.out.println("");
MyData data = new MyData();
String[] split = msg.getPayload().split(" time: ");
data.setMessage(split[0] + "_" + methodCounter + "_" + defectCounter);
data.setTime(ZonedDateTime.parse(split[1]));
repository.save(data);
// if (acknowledgment!=null) {
// System.out.println("ack nonnull");
acknowledgment.acknowledge();
// } else {
// System.out.println("ack null");
// }
}
}
I found a solution. Just wrap all the buggy code in a try catch block and do nack()
in the catch block
I found a solution. Just wrap all the buggy code in a try catch block and do nack() in the catch block