Search code examples
javasqlspringspring-bootjms

How to immediately stop execution of Spring JmsListener?


In my Java Spring Boot 3 application I have a simple JmsListener for Oracle AQ:

import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

@Component
@Log4j2
public class SomeJmsListener {

    @JmsListener(
            id = "id_listener_1",
            destination = "wmstk_queue",
            containerFactory = "jmsListenerContainerFactory"
    )
    public void listenMessage(Message message) throws Exception {
        if (message instanceof ObjectMessage objectMessage) {
            MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
            
            webClient.create()
                    .post()
                    .uri("some_url")
                    .bodyValue(messageInfo)
                    .retrieve()
                    .toEntity(String.class)
                    .block();
        }
    }
}

This listener can be stopped at some point of the application execution using the JmsListenerEndpointRegistry, as follows:

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Service;

@Service
@Log4j2
@RequiredArgsConstructor
public class JmsListenerControlService {

    private final JmsListenerEndpointRegistry registry;
    
    
    public void stop(String listenerId) {
        var container = registry.getListenerContainer(listenerId);
        
        if (container.isRunning()) {
            container.stop();
        }
    }
}

The goal to achieve is that if listener container is stopped, then execution of listenMessage must be immediately stopped (if it is running) and JMS Session must be rolled back.

The only solution I found now is to observe container and manually interrupt the thread of listenMessage:

import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
@Log4j2
public class SomeJmsListener {

    @Autowired
    private JmsListenerEndpointRegistry registry;

    private ScheduledExecutorService scheduledExecutorService;

    @JmsListener(
            id = "id_listener_1",
            destination = "wmstk_queue",
            containerFactory = "jmsListenerContainerFactory"
    )
    public void listenMessage(Message message) throws Exception {
        observeContainer(Thread.currentThread());

        if (message instanceof ObjectMessage objectMessage) {
            MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();

            webClient.create()
                    .post()
                    .uri("some_url")
                    .bodyValue(messageInfo)
                    .retrieve()
                    .toEntity(String.class)
                    .block();
        }
        stopObservation();
    }

    /**
     * Scheduled job observes jms listener and if it is not running - interrupts its thread
     */
    private void observeContainer (Thread thread){
        String listenerId = "id_listener_1";
        var listenerContainer = registry.getListenerContainer(listenerId);

        log.debug("Start observation for listener container with id: {}", listenerId);

        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        scheduledExecutorService.scheduleAtFixedRate(
                () -> {
                    if (!listenerContainer.isRunning()) {
                        stopObservation();
                        thread.interrupt(); // interrupt the thread to prevent further execution of listenMessage
                    } else {
                        log.debug("Listener active, id: {}", listenerId);
                    }
                },
                10, // 10ms initial delay
                100, // execute each 100ms
                TimeUnit.MILLISECONDS
        );
    }

    private void stopObservation () {
        log.info("Stop observation for listener container with id: {}", "wmstk-listener-endpoint-1");
        if (scheduledExecutorService != null) {
            scheduledExecutorService.close();
            scheduledExecutorService = null;
        }
    }
}

After interruption the message remains in the queue and processing is interrupted as expected, but at the very end of logs I see this exception and I am not sure if this is something to worry about or not:

org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method ' listenMessage throws java.lang.Exception'

ERROR [er-endpoint-1-1] o.s.j.l.DefaultMessageListenerContainer : Application exception overridden by rollback error

Caused by: java.io.InterruptedIOException: Socket read interrupted: oracle.jakarta.jms.AQjmsException: IO Error: Socket read interrupted

So, this exception occurs by rollback, and namely in:

package oracle.jakarta.jms;

class AQjmsSession {

     synchronized void forceRollback() throws JMSException {
        Connection db_conn = null;
        db_conn = this.getDBConnection();

        try {
            db_conn.rollback(); // here exception occurs
            this.setConsistency(true);
        } catch (SQLException var3) {
            throw new AQjmsException(var3);
        }

        this.restartConsumers();
    }
}

Is it worth worrying about this exception if the message remains in the queue and is available for reading, although exception tells that exception occurred by rollback? And maybe there is some other way to smoothly make a rollback and interrupt the execution of the method?


Solution

  • You can check if the container is running in the listener itself at the most critical point of message handling for your business logic and throw an error if it is not running. If i were you i wouldn't intervene a thread managed by container.

    You are starting and stopping an executerService for every message which can be expensive also there is no guarantee that observeContainer line will be executed before you handle the message