Search code examples
spring-bootspring-jmsactivemq-artemisqpid

How to configure JmsListener on ActiveMQ for autoscaling using Qpid Sender


I have a kubernetes cluster with an activeMQ Artemis Queue and I am using hpa for autoscaling of micro services. The messages are send via QpidSender and received via JMSListener.

Messaging works, but I am not able to configure the Queue/Listener in a way, that autoscaling works as expacted.

This is my Qpid sender

public static void send(String avroMessage, String task) throws JMSException, NamingException {
    Connection connection = createConnection();
    connection.start();

    Session session = createSession(connection);
    MessageProducer messageProducer = createProducer(session);

    TextMessage message = session.createTextMessage(avroMessage);
    message.setStringProperty("task", task);
    messageProducer.send(
        message, 
        DeliveryMode.NON_PERSISTENT, 
        Message.DEFAULT_PRIORITY, 
        Message.DEFAULT_TIME_TO_LIVE);

    connection.close();
}

private static MessageProducer createProducer(Session session) throws JMSException {
    Destination producerDestination = 
       session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
    return session.createProducer(producerDestination);
}

private static Session createSession(Connection connection) throws JMSException {
    return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}

private static Connection createConnection() throws NamingException, JMSException {
    Hashtable<Object, Object> env = new Hashtable<>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    env.put("connectionfactory.factoryLookup", amqUrl);
    Context context = new javax.naming.InitialContext(env);

    ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(connectionFactory);
    pooledConnectionFactory.setMaxConnections(10);

    return connectionFactory.createConnection(amqUsername, amqPassword);
}

This is my Listener config

@Bean
public JmsConnectionFactory jmsConnection() {
    JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
    jmsConnection.setRemoteURI(this.amqUrl);
    jmsConnection.setUsername(this.amqUsername);
    jmsConnection.setPassword(this.amqPassword);
    return jmsConnection;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConnection());

    return factory;
}

And here is my Listener

@JmsListener(
        destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
        selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
    message.acknowledge();
    doStuff();
}

I send the message like this

QpidSender.send(avroMessage, "myTask");

This setting works. I can send different messages and as soon than there are more then 2, the second instance of my service starts and consumes the message. If later the message count is below 2, the service is terminated.

The problem is: I don't want the message to be acknowledged before the doStuff(). Because if something goes wrong or if the service is terminated before finishing doStuff(), the message is lost (right?).

But if I reorder it to

doStuff();
message.acknowledge();

the second instance can not receive a message from the broker, as long as the first service is still in doStuff() and hasn't acknowledged the message.

How do I configure this in a way, that more than one instance can consume a message from the queue, but the message isn't lost, if the service gets terminated or something else fails on doStuff()?


Solution

  • Use factory.setSessionTransacted(true).

    See the javadocs for DefaultMessageListenerContainer:

     * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
     * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
     * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
     * javadoc for details on acknowledge modes and native transaction options, as
     * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
     * on configuring an external transaction manager. Note that for the default
     * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
     * before listener execution, with no redelivery in case of an exception.