Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafkaspring-transactions

Spring Kafka Consumer with database


How can I execute the below in a transaction. My requirement is message offset should not be committed to Kafka if the DB calls fails .Kafka consumer configuration is here https://pastebin.com/kq5S9Jrx

@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
    public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) 
    {
        logger.debug(String.format("Message recieved -> %s", message));
        
        // start transaction
        dbservice.validateMessage(message);
        
        dbservice.saveInDB(message);
        ack.acknowledge();
        // end transaction
}

Solution

  • Move

    dbservice.validateMessage(message);
    
    dbservice.saveInDB(message);
    

    to a new method annotated with @Transactional.

    then

    try {
        dbMethod(message);
        ack.ack();
    catch (Exception e) {
        ack.nack(); // with an optional delay before redelivery
    }
    

    Or, simply use container managed offsets (no ack/nack) and let the exception propagate to the container, where a SeekToCurrentErrorHandler can manage the retries.