Search code examples
large-filesactivemq-artemis

Redistribution not working with large messages


I'm using ActiveMQ Artemis 2.23.1. I have 3 masters and 3 slaves (all 60GB HD, 60GB RAM). One of my clients is having issues receiving large messages (~100mb).

Test procedure:

  1. consumers on node 01 and 03
  2. send 100 small messages
  3. send 100 large messages
  4. send 100 small messages again

The 100 small messages sent correctly, but broker seems to get stuck waiting for large messages which are never received. The 100 small messages sent in step #4 are also never received.

I also noticed that queues like $.artemis.internal.sf.amq-cluster.<id> have pending messages (the large messages I sent) that don't get consumed even if there is a consumer on them.

I was able to reproduce the issue by using the examples in the Artemis sources. I adapted the code of features > clustered > queue-message-redistribution to make it send and receive large message (code from features > standard > large-messages and ran mvn verify with 2 local embedded servers.

My config files are generated by an ansible script. I wonder if I have forgotten an important property to handle really big messages.

broker.xml:

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="urn:activemq:core ">

        <name>master01.intra</name>

        <persistence-enabled>true</persistence-enabled>

        <journal-type>ASYNCIO</journal-type>

        <paging-directory>data/paging</paging-directory>

        <bindings-directory>data/bindings</bindings-directory>

        <journal-directory>data/journal</journal-directory>

        <large-messages-directory>data/large-messages</large-messages-directory>

        <journal-datasync>true</journal-datasync>
        <!--        <journal-sync-non-transactional>false</journal-sync-non-transactional>-->
        <!--        <journal-sync-transactional>false</journal-sync-transactional>-->
        <journal-min-files>2</journal-min-files>

        <journal-pool-files>10</journal-pool-files>

        <journal-buffer-timeout>280000</journal-buffer-timeout>


        <!-- how often we are looking for how many bytes are being used on the disk in ms -->
        <disk-scan-period>5000</disk-scan-period>

        <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
             that won't support flow control. -->
        <max-disk-usage>90</max-disk-usage>

        <!-- should the broker detect dead locks and other issues -->
        <critical-analyzer>true</critical-analyzer>

        <critical-analyzer-timeout>120000</critical-analyzer-timeout>

        <critical-analyzer-check-period>60000</critical-analyzer-check-period>

        <critical-analyzer-policy>HALT</critical-analyzer-policy>

        <page-sync-timeout>552000</page-sync-timeout>

        <connectors>
            <!-- Connector used to be announced through cluster connections and notifications -->
            <connector name="artemis">tcp://master01.intra:61616</connector>
        </connectors>

        <acceptors>
            <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
            <!-- amqpCredits: The number of credits sent to AMQP producers -->
            <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
                        <acceptor name="artemis">tcp://master01.intra:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;virtualTopicConsumerWildcards=Consumer.*.%3E%3B2;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
                    </acceptors>

        <cluster-user>admin-cluster</cluster-user>
        <cluster-password>admin-cluster</cluster-password>

        <broadcast-groups>
            <broadcast-group name="bg-group1">
                <group-address>231.7.7.7</group-address>
                <group-port>9876</group-port>
                <broadcast-period>2000</broadcast-period>
                <connector-ref>artemis</connector-ref>
            </broadcast-group>
        </broadcast-groups>

        <discovery-groups>
            <discovery-group name="dg-group1">
                <group-address>231.7.7.7</group-address>
                <group-port>9876</group-port>
                <refresh-timeout>10000</refresh-timeout>
            </discovery-group>
        </discovery-groups>

        <cluster-connections>
            <cluster-connection name="amq-cluster">
                <address></address>
                <connector-ref>artemis</connector-ref>
                <message-load-balancing>ON_DEMAND</message-load-balancing>
                <discovery-group-ref discovery-group-name="dg-group1"/>
            </cluster-connection>
        </cluster-connections>

        <xi:include href="/app/esbbroker/etc/security-settings.xml"/>
        <xi:include href="/app/esbbroker/etc/addresses-settings.xml"/>
        <xi:include href="/app/esbbroker/etc/addresses.xml"/>
        <xi:include href="/app/esbbroker/etc/ha-policy.xml"/>

        <metrics>
            <jvm-memory>true</jvm-memory> <!-- defaults to true -->
            <jvm-gc>true</jvm-gc> <!-- defaults to false -->
            <jvm-threads>true</jvm-threads> <!-- defaults to false -->
            <netty-pool>false</netty-pool> <!-- defaults to false -->
            <plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.ArtemisPrometheusMetricsPlugin"/>
        </metrics>
    </core>
</configuration>

addresses.xml: We have a lot in this one, but all in the form of:

<addresses xmlns="urn:activemq:core">
    <address name="stirint.clo.person.signal">
        <anycast>
            <queue name="stirint.clo.person.signal"/>
        </anycast>
    </address>
    ...
</addresses>

addresses-settings.xml (notice redistribution-delay is 0 in match="# as in the queue-redistribution example):

<!-- example of one of the clients queue (all generated the same way by ansible -->
<address-settings xmlns="urn:activemq:core">
    <address-setting match="stirint.clo.person.signal">
        <dead-letter-address>DLQ.stirint.clo.person.signal</dead-letter-address>
        <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
        <max-delivery-attempts>3</max-delivery-attempts>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <!-- with -1 only the global-max-size is in use for limiting -->
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-delete-queues>false</auto-delete-queues>
        <auto-delete-addresses>false</auto-delete-addresses>
        <auto-create-jms-queues>false</auto-create-jms-queues>
        <auto-create-jms-topics>false</auto-create-jms-topics>
    </address-setting>
    <!--  ... -->
    <!-- other entries -->
    <!-- if you define auto-create on certain queues, management has to be auto-create -->
    <address-setting match="activemq.management#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <!-- with -1 only the global-max-size is in use for limiting -->
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-delete-queues>false</auto-delete-queues>
        <auto-delete-addresses>false</auto-delete-addresses>
        <auto-create-jms-queues>false</auto-create-jms-queues>
        <auto-create-jms-topics>false</auto-create-jms-topics>
    </address-setting>
    <!--default for catch all-->
    <address-setting match="#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <redistribution-delay>0</redistribution-delay>
        <!-- with -1 only the global-max-size is in use for limiting -->
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-delete-queues>false</auto-delete-queues>
        <auto-delete-addresses>false</auto-delete-addresses>
        <auto-create-jms-queues>false</auto-create-jms-queues>
        <auto-create-jms-topics>false</auto-create-jms-topics>
    </address-setting>
</addresses-settings>

security-settings.xml:

<security-settings xmlns="urn:activemq:core">
    <!-- one of many, all have similar format because generated by ansible script -->
    <security-setting match="stirint.clo.person.signal">
        <permission type="consume" roles="gStirint,amq"/>
        <permission type="browse" roles="gStirint,amq,readonly"/>
        <permission type="send" roles="gStirint,amq"/>
        <permission type="createNonDurableQueue" roles="gStirint,amq"/>
        <permission type="deleteNonDurableQueue" roles="gStirint,amq"/>
        <permission type="createDurableQueue" roles="gStirint,amq"/>
        <permission type="deleteDurableQueue" roles="gStirint,amq"/>
        <permission type="createAddress" roles="gStirint,amq"/>
        <permission type="deleteAddress" roles="gStirint,amq"/>
    </security-setting>
    ...
    <security-setting match="ActiveMQ.Advisory.TempQueue">
        <permission type="createNonDurableQueue" roles="amq,readonly" />
        <permission type="deleteNonDurableQueue" roles="amq,readonly" />
        <permission type="createDurableQueue" roles="amq,readonly" />
        <permission type="browse" roles="amq,readonly"/>
        <permission type="send" roles="amq,readonly"/>
    </security-setting>

    <security-setting match="ActiveMQ.Advisory.TempTopic">
        <permission type="createNonDurableQueue" roles="amq,readonly"/>
        <permission type="deleteNonDurableQueue" roles="amq,readonly"/>
        <permission type="createDurableQueue" roles="amq,readonly" />
        <permission type="browse" roles="amq,readonly"/>
        <permission type="send" roles="amq,readonly"/>
    </security-setting>

    <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq,readonly"/>
        <permission type="send" roles="amq"/>
        <!-- we need this otherwise ./artemis data imp wouldn't work -->
        <permission type="manage" roles="amq"/>
    </security-setting>
</security-settings>

ha-policy.xml (3 pairs of master/slave in gn1, gn2, gn3):

<ha-policy xmlns="urn:activemq:core">
    <!-- this file contains configuration of high availability cluster -->
    <replication>
        <master>
            <check-for-live-server>true</check-for-live-server>
            <group-name>gn-1</group-name>
        </master>
    </replication>
</ha-policy>

here is the link to my attempt at adapting queue-message-redistibution official example to large files: https://github.com/themikebe/artemis-redistribution-large-files

edit 25-07-2022: I ran my custom code above in version 2.19.1 (java 8), 2.21 (java 11) and 2.22 (java 11) of queue-message-redistibution example with the embedded server. The code starts to fail in version 2.22. I will now run tests on version 2.21 but in my environment and see what happens.


Solution

  • This issue was almost certainly caused by ARTEMIS-3805. Therefore the "proper" solution would be to specify this on your cluster-connection:

    <producer-window-size>-1</producer-window-size>
    

    There's no need to switch versions.

    Generally speaking, moving message around the cluster via the cluster-connection, while convenient, isn't terribly efficient (much less so for "large" messages). Ideally you would have a sufficient number of clients on each node to consume the messages that were originally produced there. If you don't have that many clients then you may want to re-evaluate the size of your cluster as it may actually decrease overall message throughput rather than increase it.