I am currently implementing inside a Quarkus project a JMS consumer which listens to a anycast queue ("my.edu.queue") on ActiveMQ Artemis (version 2.32.0). What I want to test is how the message is delivered to dead-letter queue if let's say some business error happens. I simulated the business error for every fifth message (just throw exception).
From the looks at the code from ActiveMQ Artemis examples (dead-letter example) the code for dead-letter seems straight forward. I have implemented the functionality in the same way as it is in the example (as I thought), except that I have done it with "quarkus-artemis-jms" library. The problem with the code or maybe my broker setup is that I do not get message inside dead letter queue when I do Session.rollback()
. The code of my JmsMessageConsumer
looks following if someone can from the first look guess what I am doing wrong. I have also set the max-delivery-attempts
inside broker.xml to 0 for my queue (I have added link to my project where every file is in there).
@ApplicationScoped
public class JmsMessageConsumer {
private static final Logger log = Logger.getLogger(JmsMessageConsumer.class.getName());
@Inject
ConnectionFactory connectionFactory;
private JMSContext context;
private Connection connection;
private Session session;
private MessageConsumer consumer;
private MessageProducer dlqProducer;
@ConfigProperty(name = "my.edu.queue.name", defaultValue = "my.edu.queue")
String queueName;
private AtomicInteger counter = new AtomicInteger(0);
void onStart(@Observes StartupEvent ev) throws JMSException {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
consumer = session.createConsumer(queue);
Queue dlq = session.createQueue(queueName + ".dlq");
dlqProducer = session.createProducer(dlq);
connection.start();
receiveMessages();
}
void onStop(@Observes ShutdownEvent ev) throws JMSException {
connection.close();
}
private void receiveMessages() {
try {
consumer.setMessageListener(message -> {
try {
processMessage(message.getBody(String.class));
session.commit();
} catch (Exception e) {
log.severe("Error processing message: %s".formatted(e.getMessage()));
try {
// sendToDLQ(message);
session.rollback();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
});
} catch (JMSException e) {
log.severe("Error setting message listener: %s".formatted(e.getMessage()));
}
}
private void processMessage(String text) {
counter.incrementAndGet();
if (counter.get() % 5 == 0) {
throw new RuntimeException("Error in business logic");
}
log.info("Processed message: " + text);
}
}
The whole project (which is really simple) with all the configuration files can be found here. My broker.xml
can be found inside src/resources/broker_conf/broker.xml
.
The messages are not sent to the dead letter address because max-delivery-attempts is 0. You must set max-delivery-attempts to a positive value to send the undelivered messages to the dead letter address, see QueueImpl.
To send the messages to the dead letter address after the first rollback set max-delivery-attempts to 1, i.e.
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<max-delivery-attempts>1</max-delivery-attempts>
...