I've run into failed replies when using the webflux gateway Java DSL in Spring Integration. It only works for the first few requests (<8 to be specific), I'm getting reply errors afterwards:
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
When I use .fluxTransform(f -> f)
on the inbound gateway OR when I use the non-reactive http outbound gateway, I don't get the errors, even on a jmeter benchmark with thousands of requests.
fluxTransform(f -> f)
in the first flow to make it work?fluxTransform(f -> f)
when I use Http.outboundGateway
in the second flow?Scenario
I've created a route using four gateways for a rather complex setup to make a web request on a remote machine, but I'm
Integration Flow 1:
inbound webflux gateway -> outbound jms gateway
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms, wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
Integration Flow 2:
inbound jms gateway -> outbound webflux gateway
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
The complete route looks like this:
client web request -> flow 1 -> (message broker) -> flow 2 -> server web request
Another way is to use a .channel(MessageChannels.flux())
instead of that .fluxTransform(f -> f)
. This way we really bring a back-pressure to the the WebFlux container making it waiting for available slot in the request event loop.
With that we just send to JMS queue not-honoring back-pressure and and your JMS consumer on the other side can't keep up. Plus we send a request to the same Netty server internally acquiring an event loop slot again for those internal requests.
If you are interested I wrote a unit test like this to see what is going on:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux", String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}