I am trying to use a Transactionnal RabbitMQ channel with Spring-AMQP but I want to actually swallow Exceptions to log them and being able to recover them.
Using channelTransacted=true forces the Channel to also join the current transactionManager (Hibernate in my case) and that results in the commit Exception being rethrown out of the @Transactionnal boundaries, resulting in a failure at upper level without being able to catch it and log it.
I also tried to manually attach the publish to the Transaction so it gets executed only after commit succeeded :
public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String message) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
try {
rabbitTemplate.convertAndSend(routingKey, message);
} catch (Exception exception) {
logger.error("Error while publishing message to RabbitMQ ");
}
}
});
used in that way :
Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");
but in that case I cannot use the channelTransacted=true because it will nest the registeringSynchronization inside another registeringSynchronization and fail to be called at all...
Is there a way this can be achieved ?
UPDATE : Ideally I would want to override the RabbitResourceSynchronization that is used in the ConnectionFactoryUtils class, but it is a private class without factory instanciated with
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder, connectionFactory, synched));
The solution I implemented was to do the publishing inside a new transaction after the commit of the main transaction.
The first call :
Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");
This method register to do the publishing after the main transaction committed.
public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String event) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
try {
publishFailSafe(routingKey, event);
} catch (Exception exception) {
//Do some recovering
}
}
});
}
After the main transaction committed, this one will do the publishing. As the channel is transacted, it will commit the message at the commit of that new transaction and fail only that one and errors will be caught in the previous method.
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishFailSafe(String routingKey, String event) {
try {
rabbitTemplate.convertAndSend(routingKey.getRoutingKey(), event);
} catch (Exception exception) {
//Do some recovering
}
}