I'm attempting to account for a scenario where my Publisher
publishes to a DirectExchange
that doesn't exist.
According to the spring-amqp documentation, I should be able to use rabbitTemplate.setChannelTransacted(true)
in order for this exception to be thrown:
To detect the exception on the sending thread, you can setChannelTransacted(true) on the RabbitTemplate and the exception is detected on the txCommit(). However, transactions significantly impede performance, so consider this carefully before enabling transactions for just this one use case.
Currently, the above config only throws the exception in the event that my rabbitTemplate.convertAndSend
is not wrapped in a org.springframework.transaction.annotation.Transactional
method, otherwise I just get an error logged silently with no exception thrown.
Ideally, I need my method which publish messages to perform database work prior to publishing, which requires me to wrap it in a @Transactional
. Is this possible? I'm thinking I might have a misconfiguration somewhere preventing this from working as I would expect.
Here is my rabbitTemplate
configuration:
@Bean
RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
//routingKey doesn't exist, notify and send to parking lot...
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setExchange("some.exchange");
return rabbitTemplate;
}
Here is my publish:
@Override
@Transactional
public void publishMessage() {
// database processing...
rabbitTemplate.convertAndSend("myRoutingKey", "myMessage");
}
Again, some.exchange
does not exist, and publishMessage()
is wrapped in a org.springframework.transaction.annotation.Transactional
, the output is a silent error logged:
ERROR o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:748] Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'some.exchange' in vhost '/consume', class-id=60, method-id=40)
If I modify my publish and remove @Transactional
, I will get the exception I'm looking for on txCommit()
:
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:144)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2398)
at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:1064)
at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2229)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2188)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2140)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1063)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1128)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1110)
at gov.osc.lgmm.service.core.impl.CoreServiceImpl.sendFilingStatus(CoreServiceImpl.java:120)
... 75 common frames omitted
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:46)
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.txCommit(PublisherCallbackChannelImpl.java:633)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:508)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157)
at com.sun.proxy.$Proxy185.txCommit(Unknown Source)
at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:141)
... 84 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'some.exchange' in vhost '/consume', class-id=60, method-id=40)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 94 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'some.exchange' in vhost '/consume', class-id=60, method-id=40)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
... 1 common frames omitted
I would have expected the above exception to be thrown when my rabbitTemplate.convertAndSend
is wrapped in the @Transactional
. Is the behavior that I'm experiencing expected, or is it likely that I have a misconfiguration somewhere?
I'm currently configured with spring-boot 2.5.2
/ spring-amqp 2.3.9
and my transaction manager is a JpaTransactionManager
See https://github.com/spring-projects/spring-amqp/issues/1362
When synchronizing with a primary transaction, commits (after the primary commit) are simply logged (by Spring transaction support) and are not thrown to the caller.
In Spring AMQP 2.4.x, we added a mechanism so you can check the success/failure of the secondary (rabbit commit) and make a compensating transaction (if necessary) to back out the committed DB transaction:
https://docs.spring.io/spring-amqp/docs/current/reference/html/#tx-sync
Boot 2.5.x (and 2.6.x) is no longer supported as OSS (https://spring.io/projects/spring-boot#support).
Spring AMQP 2.4.x is compatible with Boot 2.6.x and 2.7.x.
EDIT
You can also enable publisher confirms to check whether the message was ack'd (and not returned due to no route to a queue).
CorrelationData corr = new CorrelationData();
template.convertAndSend("junk", "", "foo", corr);
CompletableFuture<Confirm> future = corr.getFuture();
Confirm confirm = future.get(10, TimeUnit.SECONDS);
System.out.println(confirm + " ret:" + corr.getReturned());
Confirm [ack=false, reason=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'junk' in vhost '/', class-id=60, method-id=40)] ret:null
Note that you don't get a returned message when routing to a non-existent exchange. When the exchange is present, but there is no route, ack
will be true, but the returned message will be populated.
Also note that 3.0.x and later use a CompletableFuture
; earlier versions used a Spring ListenableFuture
.
Confirms cannot be used on transactional channels, however, so this is probably not what you want because the publish won't roll back if the DB commit fails.