Search code examples
jmsactivemq-artemis

ActiveMQ Artemis: Muticast address deliver messages inconsistently


We're using Artemis 2.19.0 and distributing XML messages.Recently, we found that some messages might get lost when sending them to a multicast address which has two durable multicast queues bound, those two durable queues both have a XPATH filter.

E.g.:

  1. Create a multicast address named: IN.ADDRESS.FOO
  2. Create two durable queues under it named: IN.QUEUE1.FOO and IN.QUEUE2.FOO
  3. Set a XPATH filter to both queues: XPATH '/XML/DATA/Direction[text()="Right"]'
  4. Send 1000 same matched XML messages to address IN.ADDRESS.FOO (With JMS and message size 20kb)

Somehow, IN.QUEUE1.FOO or IN.QUEUE2.FOO or both won't receive 1000 messages eventually.

We've tired to remove filter from one of them, then everything works fine, both queue will receive 1000 messages.

  1. We're using a HA architecture with one alive node and one back-up node.
  2. We've checked the DLQ and nothing there, so it should not be a max-redelivery-attempt exceeded.
  3. This behaviour happens even with no consumer connect.

So, my question is:

  1. Could it be the reason that a XML filter it may be significantly slower than normal filters that cause some messages get lost?
  2. If not what could be the reason?

If you have anything unclear, please ask. Thanks

UPDATE 1:

Versions:

Java: 1.8
Spring-Integration: 5.5.11
Spring-jms: 5.3.19
Artemis: 2.19.0

XML File:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
             xmlns:int-file="http://www.springframework.org/schema/integration/file"
             xmlns:task="http://www.springframework.org/schema/task"
             xmlns:jms="http://www.springframework.org/schema/integration/jms"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd
            http://www.springframework.org/schema/integration/xml
            http://www.springframework.org/schema/integration/xml/spring-integration-xml-4.3.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file-4.3.xsd
            http://www.springframework.org/schema/task
            http://www.springframework.org/schema/task/spring-task-4.3.xsd">
    <!-- Multicast address -->
    <beans:bean id="topic" class="org.apache.activemq.artemis.jms.client.ActiveMQTopic">
        <beans:constructor-arg value="IN.ADDRESS.FOO"/>
    </beans:bean>

    <!-- Anycast queue -->
    <beans:bean id="queue" class="org.apache.activemq.artemis.jms.client.ActiveMQQueue">
        <beans:constructor-arg value="AQ.QUEUE.FOO"/>
    </beans:bean>

    <channel id="topicChannel">
    </channel>

    <task:executor id="executor" pool-size="2"/>

    <publish-subscribe-channel id="outChannel" task-executor="executor"/>

    <filter id="consumer1" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>

    <filter id="consumer2" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>

    <jms:message-driven-channel-adapter id="queueAdapter" destination="queue" channel="outChannel"
                                        acknowledge="auto" connection-factory="ConnectionFactory"/>

    <jms:outbound-channel-adapter  id="topicAdapter" destination="topic" channel="topicChannel"
                                  connection-factory="ConnectionFactory"/>


</beans:beans>

ConnectionFactory Bean:

@Bean(name = "ConnectionFactory")
    public SingleConnectionFactory ibConnectionFactory(
            @Value("${artemis.broker-url}") String brokerUrl,
            @Value("${artemis.user}") String username,
            @Value("${artemis.password}") String password) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUser(username);
        factory.setPassword(password);
        return new SingleConnectionFactory(factory);
    }

Sender program:

try(ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password)) {
            Connection conn = fac.createConnection();
            Session session = conn.createSession();
            MessageProducer producer = session.createProducer(new ActiveMQQueue("AQ.QUEUE.FOO"));
            Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");

            int count = 0;
            while (count < 1000) {
                System.out.println(count);
                producer.send(msg);
                count ++;
            }
        }

XPATH Filter:

XPATH '/Root/Data/PrimaryKey/Key/DetailedIdentity/ATCode[text()="AK"]'

Message Sample:

<?xml version="1.0" encoding="UTF-8"?><Root xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>

Either remove the task-executor attribute on publish-subscribe-channel or remove one of the queue's filter fix the problem.

UPDATE 2:

A minimal example with 10 concurrent tasks sending 1000 messages in total, if all the queues under the same address both have the XPATH filter then it would not receive 1000 messages, but with removing one of the filter it works.

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;

import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestRunner {

    public static void main(String[] args) throws Exception {
        String brokerUrl = "(tcp://server1:61616,tcp://server2:61616)?ha=true&reconnectAttempts=-1&retryInterval=100&retryIntervalMultiplier=1.5&maxRetryInterval=6000";
        String user = "admin";
        String password = "admin";

        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password);
        ExecutorService ser = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            ser.submit(() -> {
                Connection conn = null;
                try {
                    conn = fac.createConnection();
                    Session session = conn.createSession();
                    MessageProducer producer = session.createProducer(new ActiveMQTopic("IN.ADDRESS.FOO"));
                    Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
                    int count = 0;
                    while (count < 100) {
                        System.out.println(count);
                        msg.setStringProperty("MessageId", String.valueOf(count));
                        producer.send(msg);
                        count++;
                    }
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

Solution

  • Thanks for the test-case. I was able to use it to reproduce the error you were seeing. I opened ARTEMIS-4687 and sent a PR to fix the issue. This will be fixed in 2.33.0 release which is due in the next few weeks.