Search code examples
springspring-amqpspring-rabbit

How to use Rabbit Connection Cache Mode and Autodeclare Features Together?


The Spring AMQP Reference says:

Starting with version 1.3, the CachingConnectionFactory can be configured to cache connections as well as just channels. In this case, each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache (if the cache size has not been reached). Channels created on such connections are cached too. The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members. Set the cacheMode to CacheMode.CONNECTION.

I have an HA cluster behind a load balancer, and I can clearly see that when I use CacheMode.CHANEL all my channels come from a single connection and they are all located in the same host.

When I change to CacheMode.CONNECTION I do see variance in the channels, i.e. not all are located in the same connection, and therefore I see different hosts from the cluster being used.

So I think this second configuration makes a lot of sense to me in this particular case. However, the documentations also says:

When the cache mode is CONNECTION, automatic declaration of queues etc. is NOT supported.

And clearly this is also an important feature, because I don't want to have to manually apply any changes from my configuration.

So, my question is, is there a way I can use both things? Would it make sense to have two connection factories, one for the RabbitAdmin and another one for the RabbitTemplate, listeners, etc.?


Solution

  • The problem is that we don't know which connection(s) on which to declare the elements; we'd end up redeclaring everything for every new connection created.

    If that's ok with you, you could add your own connection listener to the connection factory and call rabbitAdmin.initialize() on the RabbitAdmin bean (which is all the default listener inside the Admin does if the cache mode is channel).

    Or you could subclass the RabbitAdmin and override afterPropertiesSet().

    I suppose you could add some more logic to determine if you actually need to do the declarations - see RabbitAdmin.afterPropertiesSet() ...

    /**
     * If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the
     * {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the
     * callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,
     * queues and bindings are declared failure is not expected.
     *
     * @see InitializingBean#afterPropertiesSet()
     * @see #initialize()
     */
    @Override
    public void afterPropertiesSet() {
    
        synchronized (this.lifecycleMonitor) {
    
            if (this.running || !this.autoStartup) {
                return;
            }
    
            if (this.connectionFactory instanceof CachingConnectionFactory &&
                    ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
                this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
                return;
            }
    
            // Prevent stack overflow...
            final AtomicBoolean initializing = new AtomicBoolean(false);
    
            this.connectionFactory.addConnectionListener(connection -> {
    
                if (!initializing.compareAndSet(false, true)) {
                    // If we are already initializing, we don't need to do it again...
                    return;
                }
                try {
                    /*
                     * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
                     * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
                     * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
                     * declared for every connection. If anyone has a problem with it: use auto-startup="false".
                     */
                    initialize();
                }
                finally {
                    initializing.compareAndSet(true, false);
                }
    
            });
    
            this.running = true;
    
        }
    }
    

    We could probably make this an option on the admin - feel free to make a contribution.