Search code examples
springrabbitmqspring-amqp

Configuring a Dedicated Listener Container for each Queue using Spring AMQP Java Configuration


I have listeners configured in XML like this

<rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual">
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s1}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s2}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s3}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s4}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s5}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s6}" exclusive="true"/>
</rabbit:listener-container>

I am trying to move that to Java Configuration and I don't see a way to add more than one MessageListener to a ListenerContainer. Creating multiple ListenerContainer beans is not an option in my case because I would not know the number of queues to consume from until runtime. Queue names will come from a configuration file.

I did the following

@PostConstruct
public void init() 
{
    for (String queue : queues.split(","))
    {
        // The Consumers would not connect if I don't call the 'start()' method.
        messageListenerContainer(queue).start();
    }
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

It "sort" of works BUT I see some weird behavior with lots of ghost channels getting opened which never used to happen with the XML configuration. So it makes me suspicious that I am doing something wrong. I would like to know the correct way of creating MessageListenerContainers in Java configuration? Simply put, "How does Spring convert 'rabbit:listener-container' with multiple 'rabbit:listener' to java objects properly?" Any help/insight into this would be greatly appreciated.

Business Case We have a Publisher that publishes User Profile Updates. The publisher could dispatch multiple updates for the same use and we have to process them in the correct order to maintain data integrity in the data store.

Example : User : ABC, Publish -> {UsrA:Change1,...., UsrA:Change 2,....,UsrA:Change 3} -> Consumer HAS to process {UsrA:Change1,...., UsrA:Change 2,....,UsrA:Change 3} in that order.

In our previous setup, we had 1 Queue that got all the User Updates and we had a consumer app with concurrency = 5. There were multiple app servers running the consumer app. That resulted in *5 * 'Number of instances of the consumer app' channels/threads* that could process the incoming messages. The speed was GREAT! but we were having out of order processing quite often resulting in data corruption.

To maintain strict FIFO order and still process message parallelly as much as possible, we implemented queue Sharding. We have a "x-consistent-hash with a hash-header on employee-id. Our Publisher publishes messages to the hash exchange and we have multiple sharded queues bound to the hash exchange. The idea is, we will have all changes for a given user (User A for example) queued up in the same shard. We then have our consumers connect to the sharded queues in 'Exclusive' mode and 'ConcurrentConsumers = 1' and process the messages. That way we are sure to process messages in the correct order while still processing messages parallelly. We could make it more parallel by increasing the number of shards.

Now on to the consumer configuration

We have the consumer app deployed on multiple app servers.

Original Approach:

I simply added multiple 'rabbit:listener' to my 'rabbit:listener-container' in my consumer app as you can see above and it works great except for the server that starts first get an exclusive lock on all the sharded queues and the other servers are just sitting there doing no work.

New Approach:

We moved the sharded queue names to the application configuration file. Like so

Consumer Instance 1 : Properties
queues=user.queue.s1,user.queue.s2,user.queue.s3

Consumer Instance 2 : Properties
queues=user.queue.s4,user.queue.s5,user.queue.s6

Also worth noting, we could have Any number of Consumer instances and the shards could be distributed unevenly between instances depending on resource availability.

With the queue names moved to configuration file, the XML confiugration will no longer work because we cannot dynamically add 'rabbit:listener' to my 'rabbit:listener-container' like we did before.

Then we decided to switch over to the Java Configuration. That is where we are STUCK!.

We did this initially

@Bean
    public SimpleMessageListenerContainer messageListenerContainer()
    {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(queues.split(","));
        container.setMessageListener(messageListener());
        container.setMissingQueuesFatal(false);

        // Set Exclusive Consumer 'ON'
        container.setExclusive(true);

        // Should be restricted to '1' to maintain data consistency.
        container.setConcurrentConsumers(1);

        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.start();

        return container;
    }

and it works BUT all our queues are on one connection sharing 1 channel. That is NOT good for speed. What we want is One connection and every queue gets its own channel.

Next Step

No success here YET!. The java configuration in my original question is where we are at now.

I am baffled why this is so HARD to do. Clearly the XML configuration does something that is NOT easly doable in Java confiugration (Or atleast it feel sthat way to me). I see this as a gap that needs to be filled unless I am compeltly missing something. Please correct me if I am wrong. This is a genuine business case NOT some ficticious edge case. Please feel free to comment if you think otherwise.


Solution

  • and it works BUT all our queues are on one connection sharing 1 channel. That is NOT good for speed. What we want is One connection and every queue gets its own channel.

    If you switch to the DirectMessageListenerContainer, each queue in that configuration gets its own Channel.

    See the documentation.

    To answer your original question (pre-edit):

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer1(@Value("${address.queue.s1}") String queue)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(queue);
        container.setMessageListener(messageListener());
    
        // Set Exclusive Consumer 'ON'
        container.setExclusive(true);
    
        // Should be restricted to '1' to maintain data consistency.
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }
    
    ...
    
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer6(@Value("${address.queue.s6}" ) String queue)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(queue);
        container.setMessageListener(messageListener());
    
        // Set Exclusive Consumer 'ON'
        container.setExclusive(true);
    
        // Should be restricted to '1' to maintain data consistency.
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }