Search code examples
javaspringjmsspring-jmsjms-topic

How does spring jms distribute messages among durable topic listeners?


raw jms code:

TopicSubscriber durSubscriber1 = receiverSession.createDurableSubscriber(topic,"subscription_1");
durSubscriber1.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {

        RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
        try {
            System.out.println("sub_1:" + rmqTextMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
});


TopicSubscriber durSubscriber2 = receiverSession.createDurableSubscriber(topic,"subscription_2");
    durSubscriber2.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {

            RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
            try {
                System.out.println("sub_2:" + rmqTextMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
});

When I use following code each listener gets all messages. If I use same subscription name - application doesn't start.

From another hand I wrote code which uses spring jms:

config:

@Bean
public JmsListenerContainerFactory<?> myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    factory.setPubSubDomain(true);
    factory.setSubscriptionDurable(true);
    return factory;
}

listeners:

@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopic(Email email) {
    System.out.println("list_1:" + email);
}

@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopicDup(Email email) {
    System.out.println("list_2:" + email);
}

At this case both listeners divide messages. I mean that if producer sends 10 messages then listener_1 will get N messages and listener_2 will get M messages.
M+N=10

Please explain difference at 2 code snippets. Can you provide corresponding jms code for spring-jms version ?


Solution

  • It's due to the way the rabbit JMS client maps JMS to native AMQP. You have to give each listener a subscription name; otherwise they will compete for messages in the same queue.

    @JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "foo")
    public void receiveTopic(String email) {
        System.out.println("list_1:" + email);
    }
    
    @JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "bar")
    public void receiveTopicDup(String email) {
        System.out.println("list_2:" + email);
    }
    

    This is only needed for durable subscriptions.