Search code examples
javaspring-bootjmsibm-mq

spring boot jms - send and receive message inside @JmsListner


QUEUE_INI use IBM MQ9, latest spring boot, and IBM's spring-boot-starter.

I'd like to receive a message with @JmsListner, then without committing it, send another message to another queue, receive response and then commit all.

So far I has next:

    @JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
    public void receiveMessage(Message message) throws JMSException {
        LOG.info("Received {} <{}>", QUEUE_IN, message);
        jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
            requestMsg.setStringProperty("type", "com.example.SubRequest");
            return requestMsg;
        });
        LOG.info("Message sent");
        Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
        LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
    }

I'm stuck on 'Message sent'. It looks like sub-request hasn't really sent. I see in MQ UI that queue depth is 1 but there is no message inside, and my sub-request listener also doesn't see any messages.

I've also tried use sendAndReceive method:

Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
    Message msg = session.createTextMessage();
    msg.setStringProperty("type", "com.example.SubRequest");
    LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
    return msg;
});

But I do not have permissions to access model queue.

Is there any way to make this work?

UPDATE: I made this work with combined help from you all. I end up with separate service to only send sub request with @Transactional(propagation = Propagation.REQUIRES_NEW). All other logic remained within main listener.

Also turning on transactions start/end logs was helpful:

logging:
  level:
    org.springframework.transaction.interceptor: trace

Solution

  • Your outbound message sender is enrolled in the same transaction as your message receiver. So the receiver of the outbound message won't see the message until the transaction commits. I think you need to start a new transaction to perform the inner procedure.

    == Update ==

    So it's been a while, and I don't have a dev environment set up for this, but I suggest something like this. Essentially, you're splitting up the Listener and Sender/Receiver into two separate classes. The Sender/Receiver should be injected into your listener so the @Transactional annotation is honored.

    public class MyListener {
        private final MySender sender;
    
        public MyListener(MySender sender) {
            this.sender = sender;
        }
    
        @JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
        public void receiveMessage(Message message) throws JMSException {
            LOG.info("Received {} <{}>", QUEUE_IN, message);
            Message reply = sender.sendAndReceive()
            LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
        }
    }
    
    
    public class MySender {
        private final JmsTemplate jmsTemplate2;
        private final Destination QUEUE_OUT;
    
        public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
            this.jmsTemplate2 = jmsTemplate2;
            this.QUEUE_OUT = QUEUE_OUT;
        }
    
        @Transactional(propagation=Propagation.NESTED)  // Or REQUIRES_NEW, edepending on usage
        public Message sendAndReceive() throws JMSException {
            jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
                requestMsg.setStringProperty("type", "com.example.SubRequest");
                return requestMsg;
            });
            LOG.info("Message sent");
            Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
            LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
            return reply;
        }
    }
    

    The @Transactional(propagation=Propagation.NESTED) will start a new transaction if one is already running (and commit/rollback when the method exits), so you should be able to send/receive the message and return it to you listener.