Search code examples
javamicroservicesopenshiftjmsactivemq-artemis

How can I configure AMQ broker on Openshift 4 to ensure messages are consumed by only one pod in a publish-subscriber model?


I am using Openshift 4 to deploy multiple microservices which are connected via an AMQ broker using a publish-subscribe messaging model. However, when I increase number of pod to 2, I am running into an issue where all pods are consuming the same message, rather than just one.

Can someone suggest how to configure java code below or the AMQ broker to ensure that messages are consumed by only one pod in a publish-subscriber model? Are there any specific settings I should be aware of or changes I need to make to my configuration? Thank you.

Configuration:

@Bean
public JmsListenerContainerFactory<DefaultMessageListenerContainer> jmsListenerContainerPublisherFactory(ConnectionFactory connectionFactory,
                                                                                                         DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(throwable -> {
        log.info("An error has occurred in the transaction: " + throwable.getMessage());
        log.error("Error: ", throwable);
    });
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(true);
    return factory;
}

Listener:

@JmsListener(destination = "${queue.dummyObject}",
        containerFactory = "jmsListenerContainerPublisherFactory")
public void onConsumePublishedMessage(String message) throws JsonProcessingException {
    DummyObjectDTO dummyObjectDTO = mapper.readValue(message, DummyObjectDTO.class);
    LOG.info(" Received onConsumePublishedMessage message : " + dummyObjectDTO);
}

Producer:

private  <T> T  sendFeatured(T value, String queue, boolean publish, Selector selector) {
    *
    *
    jmsTemplate.setPubSubDomain(true);
    jmsTemplate.convertAndSend(queue, objectAsJson);
    *
    *
    return value;
}

Solution

  • Since you're using ActiveMQ Artemis you can use the shared subscription functionality introduced in JMS 2. When configuring your DefaultJmsListenerContainerFactory use setSubscriptionShared:

    factory.setPubSubDomain(true);
    factory.setSubscriptionShared(true);
    return factory;
    

    Then on your listener you can set the subscription name, e.g.:

    @JmsListener(destination = "${queue.dummyObject}",
                 subscription = "mySubscriptionName",
                 containerFactory = "jmsListenerContainerPublisherFactory")
    

    Each set of subscribers will need to use a unique subscription name so that only one of them receives the message rather than all of them.