I have select for update
within thransaction where I select from DB list of Items by limit(10 items by one select) and then have to send all of them to kafka and delete all of them from DB within same transaction.
Condition:
If operation send to Kafka failed with TopicAuthorizationException I need to keep sending next Item and delete next Item from DB within same transaction
Code example
Kafka config
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
val kafkaTemplate = KafkaTemplate(producerFactory)
kafkaTemplate.setProducerListener(object : ProducerListener<String, String> {
override fun onError(
producerRecord: ProducerRecord<String, String>,
recordMetadata: RecordMetadata?,
exception: Exception
) {
if (exception is TopicAuthorizationException) {
logger.error("TopicAuthorizationException exception occured [${exception.message}]")
}
}
})
return kafkaTemplate
}
Business logic
@Service
class ItemService(
private val kafkaMessageSender: KafkaMessageSender,
private val itemRepository: ItemRepository
) {
@Transactional(
transactionManager = "transactionManager",
isolation = Isolation.REPEATABLE_READ,
propagation = Propagation.REQUIRED
)
fun findItemsToProcess() {
val items = itemRepository.findItemsWithSelectForUpdateAndLimit(limit = 10)
items.forEach { item ->
val producerRecord = ProducerRecord<String, String>(
item.topic,
item.payload)
try {
kafkaMessageSender.send(producerRecord)
itemRepository.delete(item)
} catch (ex: Exception) {
logger.error("Item error occurred [${ex.message}]")
}
}
}
}
Problem
If I try to send to topic which is not exist and get TopicAuthorizationException exception then KafkaTransactionManager put transaction in error state and on any operations I get Cannot execute transactional method because we are in an error state
. This problem suppose to solved by my setProducerListener
but it did not solve the problem.
Question: How can I configure kafkaTemplate to prevent KafkaTransactionManager to put transaction in error state if TopicAuthorizationException occurred?
You need to show your transaction manager configuration. However, you can't continue to use a transactional producer after an error occurs. If you are sending all records to the same topic, this error should occur on the first send; you probably need to thrown an exception to abort the transaction, and put retry logic in the calling code.