Search code examples
javaspring-bootasynchronousspring-jms

Spring Boot JmsListener blocked by other JmsListener for different destination


I encountered the following situation in a Spring Boot application with JMS over ActiveMQ 5 as broker.

A @JMSListener annotated method processes a message and sends a response message to a different destination. There is also a @JMSListener for this destination, which is not called when the response has been sent to the broker, but only when the processing of the original listner is completely finished. If this listener is additionally annotated with @Async, the reponse is received immediately after sending as expected.

The original project is way too big, so I prepared the minimal example below. It contains a Spring Boot application TestApp with a single @JmsListener (1) which immediately forwards a message from Destination in to out and afterwards sleeps for 3 seconds.

The application is started in a test which sends a single message to in and waits for 2 seconds for the response on out.

Only if the @Async is present at (1) the test is successful.

Further observations:

  • Same behaviour if the test uses variant (2) and receives the response via JmsTemplate instead using a JmsListener.
  • In any case one can see that the message is present in the broker immediately after sending.

Question: Why is receiving self-sent messages blocked in this situation? How can the outgoing message be received immediately without using @Async?

Update/Solution: As Gary suggested there is indeed a transaction present but seemingly not of Spring Boot but one created by the included activemq-lib.

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
    private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);

    @Container
    public static final GenericContainer<?> ACTIVEMQ =
        new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
            .withExposedPorts(8161, 61616)
            .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
            .withStartupTimeout(Duration.ofSeconds(60))
            .withLogConsumer(new Slf4jLogConsumer(LOG));

    @DynamicPropertySource
    private static void ports(DynamicPropertyRegistry registry)
    {
        registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
    }

    @Autowired
    private JmsTemplate jmsTemplate;

    private List<String> messages = new LinkedList<>();

    @Async
    @JmsListener(destination = "out")
    public void onOut(String message)
    {
        LOG.warn("Received message from out: {}", message);
        messages.add(message);
    }

    @Test
    public void foo() throws InterruptedException
    {
        LOG.warn("Sending request");
        // Sending some message on destination 'in' to be received and answered by the listener below
        jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());

        LOG.warn("Waiting for repsonse");

        // (2)    // Try to receive response from 'out'
        //        jmsTemplate.setReceiveTimeout(2_000);
        //        Message response = jmsTemplate.receive("out");
        //        assertThat(response, notNullValue());
        
        Thread.sleep(2_000);
        assertThat(messages, hasSize(1));

    }
}

@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
    private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);

    public static void main(String[] args)
    {
        SpringApplication.run(TestApp.class, args);
    }

    @Autowired
    private JmsTemplate jmsTemplate;

    // (1)
    // @Async
    @JmsListener(destination = "in")
    public void onIn(String message) throws InterruptedException
    {
        LOG.warn("Received message from in: {}", message);

        jmsTemplate.convertAndSend("out", message);
        LOG.warn("Sent Response");

        LOG.warn("Sleeping ...");
        Thread.sleep(3_000);

        LOG.warn("Finished");
    }
}

Here the pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>foo</groupId>
    <artifactId>jmstest</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.5.3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>


        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>testcontainers</artifactId>
            <version>1.15.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>1.15.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>
</project>

Solution

  • It looks like you are using transactions, the transaction won't commit until the @JmsListener method exits so the other consumer won't see it.

    You can't use transactions for this use case.

    Hence the @Async works because the send will be performed in a different transaction.