Search code examples
nettyquarkusvert.xquarkus-rest-client

Is Quarkus rest client parallel connections limited to 256


I have a client to request remote server

Multi.createFrom()
      .items(
          userInfoList.stream())
      .onItem()
      .transformToUniAndMerge(
           userInfo -> {
              System.out.println( personInfo.toString() );
              restClientService.aRESTClientService( userInfo );
           }
      )

rest client :

@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/xxx")
@RegisterRestClient
public interface RestClientService {
    @GET
    @Path("/xxxx")
    Uni<ResultDto<String>> aRESTClientService(UserInfo userInfo);
}

am I doing something wrong ? or is there something that can be configured


Solution

  • I don't know if this will help, but I had a similar issue. I had similar code:

    @Scheduled(every = "1s")
    void processRequests() {
        
        // assume pendingRequests is an unlimited incoming stream
        Multi.createBy().repeating().supplier(pendingRequests::poll)
        ... // rest client calling each request
    }
    

    The throughput of this application was exactly 256, and I thought it was due to some limitation in the client; but no, it was because the stream was overflowing after 256 polls. You might be able to increase the throughput by having multiple Multi streams consuming from userInfoList.stream(). Also, use a counter to see how many items you were able to consume before overflowing; you might find out that overflowing is the issue.

    UPDATE

    A came across an article that explained the magical 256 number (https://pandepra.medium.com/project-reactors-flatmap-and-backpressure-fba97472d625). I Also did some testing to understand how flatMap works. So flatMap's request number is 256, so you are stuck with processing at most 256 items. Also, after n items have been processed (sent to downstream subscribers), n items will be requested again, always up to 256 (flatMap holds an internal queue with max of 256 items). In my first description I was overflowing since poll() was pushing more than 256 items. If you want to increase this number you can do:

    // increase the concurrency value in `merge` (default is 256)
    Multi...
    .onItem().transformToUni(n -> Uni.createFrom()...).merge(500)
    
    // OR have multiple streams consuming 
    Multi.createBy().merging().streams(List.of(
            // each stream here has a flatMap
            multi1(),
            multi2()
        ))
    

    Not sure if there is any major difference with my suggestions above though. Finally, log() is you friend. I have been playing around with log() to understand what each operator is doing in between.