I encountered the following situation in a Spring Boot application with JMS over ActiveMQ 5 as broker.
A @JMSListener
annotated method processes a message and sends a response message to a different destination. There is also a @JMSListener
for this destination, which is not called when the response has been sent to the broker, but only when the processing of the original listner is completely finished. If this listener is additionally annotated with @Async
, the reponse is received immediately after sending as expected.
The original project is way too big, so I prepared the minimal example below.
It contains a Spring Boot application TestApp
with a single @JmsListener
(1) which immediately forwards a message from Destination in to out and afterwards sleeps for 3 seconds.
The application is started in a test which sends a single message to in and waits for 2 seconds for the response on out.
Only if the @Async
is present at (1) the test is successful.
Further observations:
JmsTemplate
instead using a JmsListener
.Question: Why is receiving self-sent messages blocked in this situation? How can the outgoing message be received immediately without using @Async
?
Update/Solution: As Gary suggested there is indeed a transaction present but seemingly not of Spring Boot but one created by the included activemq-lib.
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);
@Container
public static final GenericContainer<?> ACTIVEMQ =
new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
.withExposedPorts(8161, 61616)
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
.withStartupTimeout(Duration.ofSeconds(60))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@DynamicPropertySource
private static void ports(DynamicPropertyRegistry registry)
{
registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
}
@Autowired
private JmsTemplate jmsTemplate;
private List<String> messages = new LinkedList<>();
@Async
@JmsListener(destination = "out")
public void onOut(String message)
{
LOG.warn("Received message from out: {}", message);
messages.add(message);
}
@Test
public void foo() throws InterruptedException
{
LOG.warn("Sending request");
// Sending some message on destination 'in' to be received and answered by the listener below
jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());
LOG.warn("Waiting for repsonse");
// (2) // Try to receive response from 'out'
// jmsTemplate.setReceiveTimeout(2_000);
// Message response = jmsTemplate.receive("out");
// assertThat(response, notNullValue());
Thread.sleep(2_000);
assertThat(messages, hasSize(1));
}
}
@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);
public static void main(String[] args)
{
SpringApplication.run(TestApp.class, args);
}
@Autowired
private JmsTemplate jmsTemplate;
// (1)
// @Async
@JmsListener(destination = "in")
public void onIn(String message) throws InterruptedException
{
LOG.warn("Received message from in: {}", message);
jmsTemplate.convertAndSend("out", message);
LOG.warn("Sent Response");
LOG.warn("Sleeping ...");
Thread.sleep(3_000);
LOG.warn("Finished");
}
}
Here the pom.xml
:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>foo</groupId>
<artifactId>jmstest</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
It looks like you are using transactions, the transaction won't commit until the @JmsListener
method exits so the other consumer won't see it.
You can't use transactions for this use case.
Hence the @Async
works because the send will be performed in a different transaction.