Search code examples
spring-amqpspring-rabbit

Spring AMQP - Publish to a Non-Existent Exchange Exception Handling


I'm attempting to account for a scenario where my Publisher publishes to a DirectExchange that doesn't exist.

According to the spring-amqp documentation, I should be able to use rabbitTemplate.setChannelTransacted(true) in order for this exception to be thrown:

To detect the exception on the sending thread, you can setChannelTransacted(true) on the RabbitTemplate and the exception is detected on the txCommit(). However, transactions significantly impede performance, so consider this carefully before enabling transactions for just this one use case.

Currently, the above config only throws the exception in the event that my rabbitTemplate.convertAndSend is not wrapped in a org.springframework.transaction.annotation.Transactional method, otherwise I just get an error logged silently with no exception thrown.

Ideally, I need my method which publish messages to perform database work prior to publishing, which requires me to wrap it in a @Transactional. Is this possible? I'm thinking I might have a misconfiguration somewhere preventing this from working as I would expect.

Here is my rabbitTemplate configuration:

    @Bean
    RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
            
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                //routingKey doesn't exist, notify and send to parking lot...
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setExchange("some.exchange");
        
        return rabbitTemplate;
    }

Here is my publish:

    @Override
    @Transactional
    public void publishMessage() {
        
        // database processing...
        rabbitTemplate.convertAndSend("myRoutingKey", "myMessage");
    }

Again, some.exchange does not exist, and publishMessage() is wrapped in a org.springframework.transaction.annotation.Transactional, the output is a silent error logged:

ERROR o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:748] Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'some.exchange' in vhost '/consume', class-id=60, method-id=40)

If I modify my publish and remove @Transactional, I will get the exception I'm looking for on txCommit():

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:144)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2398)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:1064)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2229)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2188)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2140)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1063)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1128)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1110)
        at gov.osc.lgmm.service.core.impl.CoreServiceImpl.sendFilingStatus(CoreServiceImpl.java:120)
        ... 75 common frames omitted
Caused by: java.io.IOException: null
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:46)
        at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.txCommit(PublisherCallbackChannelImpl.java:633)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
        at java.lang.reflect.Method.invoke(Method.java:508)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157)
        at com.sun.proxy.$Proxy185.txCommit(Unknown Source)
        at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:141)
        ... 84 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'some.exchange' in vhost '/consume', class-id=60, method-id=40)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
        ... 94 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'some.exchange' in vhost '/consume', class-id=60, method-id=40)
        at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
        ... 1 common frames omitted

I would have expected the above exception to be thrown when my rabbitTemplate.convertAndSend is wrapped in the @Transactional. Is the behavior that I'm experiencing expected, or is it likely that I have a misconfiguration somewhere?

I'm currently configured with spring-boot 2.5.2 / spring-amqp 2.3.9 and my transaction manager is a JpaTransactionManager


Solution

  • See https://github.com/spring-projects/spring-amqp/issues/1362

    When synchronizing with a primary transaction, commits (after the primary commit) are simply logged (by Spring transaction support) and are not thrown to the caller.

    In Spring AMQP 2.4.x, we added a mechanism so you can check the success/failure of the secondary (rabbit commit) and make a compensating transaction (if necessary) to back out the committed DB transaction:

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#tx-sync

    Boot 2.5.x (and 2.6.x) is no longer supported as OSS (https://spring.io/projects/spring-boot#support).

    Spring AMQP 2.4.x is compatible with Boot 2.6.x and 2.7.x.

    EDIT

    You can also enable publisher confirms to check whether the message was ack'd (and not returned due to no route to a queue).

    CorrelationData corr = new CorrelationData();
    template.convertAndSend("junk", "", "foo", corr);
    CompletableFuture<Confirm> future = corr.getFuture();
    Confirm confirm = future.get(10, TimeUnit.SECONDS);
    System.out.println(confirm + " ret:" + corr.getReturned());
    
    Confirm [ack=false, reason=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'junk' in vhost '/', class-id=60, method-id=40)] ret:null
    

    Note that you don't get a returned message when routing to a non-existent exchange. When the exchange is present, but there is no route, ack will be true, but the returned message will be populated.

    Also note that 3.0.x and later use a CompletableFuture; earlier versions used a Spring ListenableFuture.

    Confirms cannot be used on transactional channels, however, so this is probably not what you want because the publish won't roll back if the DB commit fails.