I am trying to learn Spring webflux. I have written the following code to test the performance of reactive programming. Here is my controller of one service:
@RestController
public class PlayerController {
@Autowired
private PlayerRepository playerRepository;
@GetMapping("/players/{id}")
public Mono<Player> getPlayerById(@PathVariable int id) {
return playerRepository.findById(id);
}
Model class:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Player {
@Id
private Integer id;
private String name;
private Integer age;
}
Repository:
@Repository
public interface PlayerRepository extends ReactiveCrudRepository<Player, Integer> {}
Following is the client which is calling the above service and then doing a database call and then checking if they returned the same data. The repository and the model for the client is same.
@RestController
public class PlayerController {
@Autowired
private PlayerRepository playerRepository;
@Autowired
private WebClient webClient;
@GetMapping("/players/{id}")
public Mono<Response> getPlayerById(@PathVariable int id) {
LocalDateTime start = LocalDateTime.now();
Mono<Player> playerExternal = webClient.get().uri("/players/{id}", id).retrieve().bodyToMono(Player.class);
Mono<Player> playerDB = playerRepository.findById(2);
Mono<Response> result = playerExternal.zipWith(playerDB, (ext, db) -> {
Response response = new Response();
response.setFlag(ext.getId() == db.getId());
LocalDateTime end = LocalDateTime.now();
response.setTimeTaken(end.from(start).until(end, ChronoUnit.MILLIS) + " ms");
return response;
});
return result;
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>reactive-spring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactive-spring</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The client is running on port 8080 and the calling service is running on port 9090. I am using spring boot 2.5.0. I am also using the jmeter to simulate 10000 concurrent request. On executing the jmeter test plan, i am getting the following exception,
org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.7.jar:5.3.7]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Request to GET http://localhost:9090/players/1 [DefaultWebClient]
|_ checkpoint ⇢ Handler com.example.demo.controller.PlayerController#getPlayerById(int) [DispatcherHandler]
|_ checkpoint ⇢ HTTP GET "/players/1" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.7.jar:5.3.7]
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204) ~[reactor-core-3.4.6.jar:3.4.6]
Please let me know if i need to provide the full stacktrace. Can you please let me know if I am doing anything that i am not supposed to. I can share the jmeter test result as well. I am seeing lots of requests failed and most of them are taking long time(ranges between 1 sec to 18 sec.) to respond.
By default WebClient
runs with a connection pool. The default settings for the pool are 500 max connections and max 1000 pending requests. You have JMeter
and try to simulate 10000 but you do not specify how you distribute the load. You may need to increase the max pending requests. Have a look at this documentation and this documentation.
If you want to configure the WebClient
then you need:
@Bean
public ReactorResourceFactory resourceFactory() {
ConnectionProvider provider =
ConnectionProvider.builder("test")
.maxConnections(500)
// Set custom max pending requests
.pendingAcquireMaxCount(...)
.build();
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setUseGlobalResources(false);
factory.setConnectionProvider(provider);
return factory;
}
@Bean
public WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory(), mapper);
return WebClient.builder().clientConnector(connector).build();
}