How can I execute the below in a transaction. My requirement is message offset should not be committed to Kafka if the DB calls fails .Kafka consumer configuration is here https://pastebin.com/kq5S9Jrx
@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack)
{
logger.debug(String.format("Message recieved -> %s", message));
// start transaction
dbservice.validateMessage(message);
dbservice.saveInDB(message);
ack.acknowledge();
// end transaction
}
Move
dbservice.validateMessage(message);
dbservice.saveInDB(message);
to a new method annotated with @Transactional
.
then
try {
dbMethod(message);
ack.ack();
catch (Exception e) {
ack.nack(); // with an optional delay before redelivery
}
Or, simply use container managed offsets (no ack/nack) and let the exception propagate to the container, where a SeekToCurrentErrorHandler
can manage the retries.