Search code examples
spring-amqpspring-rabbit

How to create rabbit listeners using spring AMQP to listen to queues across multiple vhosts


I have multiple virtual hosts each with a request queue and a response queue. These virtual hosts serve different clients. The names for the request queue and the response queue remain the same across the virtual hosts.

I have created a SimpleRoutingConnectionFactory with the clientName()+"ConnectionFactory" as the lookup key and a corresponding CachingConnectionFactory as the value in the map. I'm able to publish message to the request queues by binding and the RabbitTemplate to a virtual host before convertAndSend and then unbinding it.

I'm not able to consume messages from the response queues from different virtual hosts. I have created a SimpleRabbitListenerContainerFactory for each client. I implemented RabbitListenerConfigurer and registered a SimpleRabbitListenerEndpoint for each SimpleRabbitListenerContainerFactory. I also set the connectionFactory on each SimpleRabbitListenerContainerFactory as the client's CachingConnectionFactory.


@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private ClientList clients;

    @Bean
    @Primary
    public SimpleRoutingConnectionFactory routingConnectionFactory() {
        final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
        final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
        applicationContext.getBeansOfType(ConnectionFactory.class)
                .forEach((beanName, bean) -> {
                    routeMap.put(beanName, bean);
                });
        routingConnectionFactory.setTargetConnectionFactories(routeMap);
        return routingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(routingConnectionFactory());
    }

    @Bean
    public DirectExchange orbitExchange() {
        return new DirectExchange("orbit-exchange");
    }

    @Bean
    public Queue requestQueue() {
        return QueueBuilder
                .durable("request-queue")
                .lazy()
                .build();
    }

    @Bean
    public Queue responseQueue() {
        return QueueBuilder
                .durable("response-queue")
                .lazy()
                .build();
    }

    @Bean
    public Binding requestBinding() {
        return BindingBuilder.bind(requestQueue())
                .to(orbitExchange())
                .with("orbit-request");
    }

    @Bean
    public Binding responseBinding() {
        return BindingBuilder.bind(responseQueue())
                .to(orbitExchange())
                .with("orbit-response");
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            clients.get()
                    .stream()
                    .forEach(client -> {
                        var endpoint = createEndpoint(client);
                        var listenerContainerFactory = applicationContext.getBean(client.getName() + "ListenerContainerFactory");
                        listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
                        registrar.registerEndpoint(endpoint, listenerContainerFactory);
                    });
        }
    }

    private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
        var endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setId(client.getName());
        endpoint.setQueueNames("response-queue");
        endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
        return endpoint;
    }

}

However, I get org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s). java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]

I'm not able to figure out whats causing this as I'm not using the SimpleRoutingConnectionFactory for message consumption at all.

EDIT: Full stack trace below -

ERROR [2020-07-09T04:12:38,028] [amdoListenerEndpoint-1] [TraceId:] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s).
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
        at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
        at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2089)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2042)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1836)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1817)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1349)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
        at java.base/java.lang.Thread.run(Thread.java:834)

EDIT2: I used the routingConnectionFactory with every listener and used the setLookUpKeyQualifier. No more exceptions but, the listeners don't seem to be doing anything i.e., the queues are not being listened to.

@Import(MqConfig.class) 
//This is to import CachingConnectinFactory beans and SimpleRabbitListenerContainerFactory beans for all clients
@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private ClientList clients;

    @Bean
    @Primary
    public SimpleRoutingConnectionFactory routingConnectionFactory() {
        final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
        final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
        applicationContext.getBeansOfType(ConnectionFactory.class)
                .forEach((beanName, bean) -> {
                    routeMap.put(beanName+"[response-queue]", bean);
                });
        routingConnectionFactory.setTargetConnectionFactories(routeMap);
        return routingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(routingConnectionFactory());
    }

    @Bean
    public DirectExchange orbitExchange() {
        return new DirectExchange("orbit-exchange");
    }

    @Bean
    public Queue requestQueue() {
        return QueueBuilder
                .durable("request-queue")
                .lazy()
                .build();
    }

    @Bean
    public Queue responseQueue() {
        return QueueBuilder
                .durable("response-queue")
                .lazy()
                .build();
    }

    @Bean
    public Binding requestBinding() {
        return BindingBuilder.bind(requestQueue())
                .to(orbitExchange())
                .with("orbit-request");
    }

    @Bean
    public Binding responseBinding() {
        return BindingBuilder.bind(responseQueue())
                .to(orbitExchange())
                .with("orbit-response");
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            clients.get()
                    .stream()
                    .forEach(client -> {
                        var endpoint = createEndpoint(client);
                        var listenerContainerFactory = getListenerContainerFactory(Client client);
                        listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
                        registrar.registerEndpoint(endpoint, listenerContainerFactory);
                    });
        }
    }

    private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
        var endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setId(client.getName());
        endpoint.setQueueNames("response-queue");
        endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
        return endpoint;
    }

    private SimpleRabbitListenerContainerFactory getListenerContainerFactory(Client client) {
        var listenerContainerFactory =  (SimpleRabbitListenerContainerFactory) applicationContext.getBean(client.getName() + "ListenerContainerFactory");
        listenerContainerFactory.setConnectionFactory(routingConnectionFactory());
        listenerContainerFactory.setContainerCustomizer(container -> {
            container.setQueueNames("response-queue");
            container.setLookupKeyQualifier(client.getName());
            container.setMessageListener(message -> log.info("Received message"));
        });
        return listenerContainerFactory;
    }

}

Solution

  • There is something very strange going on; [null] implies that when we call getRoutingLookupKey() the cf is not a routing cf but when we call getConnectionFactory() it is.

    It's not obvious how that can happen. Perhaps you can figure out why in a debugger?

    One solution would be to inject the routing cf and use setLookupKeyQualifier(...).

    The lookup key will then be clientId[queueName].