Search code examples
rabbitmqspring-integrationspring-integration-amqp

How to handle errors when RabbitMQ exchange doesn't exist (and messages are sent through a messaging gateway interface)


I'd like to know what is the canonical way to handle errors in the following situation (code is a minimal working example):

  • Messages are sent through a messaging gateway which defines its defaultRequestChannel and a @Gateway method:
@MessagingGateway(name = MY_GATEWAY, defaultRequestChannel = INPUT_CHANNEL)
public interface MyGateway
{
  @Gateway
  public void sendMessage(String message);
  • Messages are read from the channel and sent through an AMQP outbound adapter:
@Bean
public IntegrationFlow apiMutuaInputFlow()
{
  return IntegrationFlows
    .from(INPUT_CHANNEL)
    .handle(Amqp.outboundAdapter(rabbitConfig.myTemplate()))
    .get();
}
  • The RabbitMQ configuration is skeletal:
@Configuration
public class RabbitMqConfiguration
{
    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public RabbitTemplate myTemplate()
    {
        RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
        r.setExchange(INPUT_QUEUE_NAME);
        r.setConnectionFactory(rabbitConnectionFactory);
        return r;
    }
}

I generally include a bean to define the RabbitMQ configuration I'm relying upon (exchange, queues and bindings), and it actually works fine. But while testing for failure scenarios, I found a situation I don't know how to properly handle using Spring Integration. The steps are:

  • Remove the beans that configure RabbitMQ
  • Run the flow against an unconfigured, vanilla RabbitMQ instance.

What I would expect is:

  • The message cannot be delivered because the exchange cannot be found.
  • Either I find some way to get an exception from the messaging gateway on the caller thread.
  • Either I find some way to otherwise intercept this error.

What I find:

  • The message cannot be delivered because the exchange cannot be found, and indeed this error message is logged every time the @Gateway method is called.
2020-02-11 08:18:40.746 ERROR 42778 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'my.exchange' in vhost '/', class-id=60, method-id=40)
  • The gateway is not failing, nor have I find a way to configure it to do so (e.g.: adding throws clauses to the interface methods, configuring a transactional channel, setting wait-for-confirm and a confirm-timeout).
  • I haven't found a way to otherwise catch that CachingConectionFactory error (e.g.: configuring a transactional channel).
  • I haven't found a way to catch an error message on another channel (specified on the gateway's errorChannel), or in Spring Integration's default errorChannel.

I understand such a failure may not be propagated upstream by the messaging gateway, whose job is isolating callers from the messaging API, but I definitely expect such an error to be interceptable.

Could you point me in the right direction?

Thank you.


Solution

  • RabbitMQ is inherently async, which is one reason that it performs so well.

    You can, however, block the caller by enabling confirms and returns and setting this option:

    /**
     * Set to true if you want to block the calling thread until a publisher confirm has
     * been received. Requires a template configured for returns. If a confirm is not
     * received within the confirm timeout or a negative acknowledgment or returned
     * message is received, an exception will be thrown. Does not apply to the gateway
     * since it blocks awaiting the reply.
     * @param waitForConfirm true to block until the confirmation or timeout is received.
     * @since 5.2
     * @see #setConfirmTimeout(long)
     * @see #setMultiSend(boolean)
     */
    public void setWaitForConfirm(boolean waitForConfirm) {
        this.waitForConfirm = waitForConfirm;
    }
    

    (With the DSL .waitForConfirm(true)).

    This also requires a confirm correlation expression. Here's an example from one of the test cases

        @Bean
        public IntegrationFlow flow(RabbitTemplate template) {
            return f -> f.handle(Amqp.outboundAdapter(template)
                    .exchangeName("")
                    .routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
                    .confirmCorrelationFunction(msg -> msg)
                    .waitForConfirm(true));
        }
    
        @Bean
        public CachingConnectionFactory cf() {
            CachingConnectionFactory ccf = new CachingConnectionFactory(
                    RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
            ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
            ccf.setPublisherReturns(true);
            return ccf;
        }
    
        @Bean
        public RabbitTemplate template(ConnectionFactory cf) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
            rabbitTemplate.setMandatory(true);               // for returns
            rabbitTemplate.setReceiveTimeout(10_000);
            return rabbitTemplate;
        }
    

    Bear in mind this will slow down things considerably (similar to using transactions) so you may want to reconsider whether you want to do this on every send (unless performance is not an issue).