Search code examples
rsocketspring-boot-rsocket

Why RSocket connection retry is using multiple (different) threads every time


I have the below program that connects to a Spring boot rsocket server running on localhost:7999. I have configured the connector Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5)) As you can see the the RSocketRequester is Mono so it should hold a single connection. When the connection fails and the Retry begins, I see that every retry is made from a different thread i.e. as below parallel-1---parallel-8. May I know the reason behind this ?

12:08:24.463550|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:30.470593|parallel-2|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:36.475666|parallel-3|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:42.494801|parallel-4|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #4 (4 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:48.499084|parallel-5|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #5 (5 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:54.503385|parallel-6|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #6 (6 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:00.509830|parallel-7|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #7 (7 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:06.545815|parallel-8|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #8 (8 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:12.553582|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #9 (9 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}

My Program is as below:

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(e -> e.add(new Jackson2CborEncoder()))
    .decoders(e -> e.add(new Jackson2CborDecoder()))
    .build();

Mono<RSocketRequester> requester = Mono.just(RSocketRequester.builder()
     .rsocketConnector(connector -> {
           connector.reconnect(
                     Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5))
                     .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}", e)))
             .acceptor(RSocketMessageHandler.responder(strategies,this))
             .payloadDecoder(PayloadDecoder.ZERO_COPY);
            })
      .dataMimeType(MediaType.APPLICATION_CBOR)
      .setupRoute("test")
      .setupData("test-123")
      .rsocketStrategies(strategies)
      .tcp("localhost",7999));


Solution

  • Thanks @Yuri @bruto @OlegDokuka and for your suggestions and answers. I have changed my program as below to enforce retry to run on single thread.

    connector.reconnect(
            Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5))
            .scheduler(Schedulers.single()) // <---- This enforces retry to run on a single thread
            .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}", e)))
            .acceptor(RSocketMessageHandler.responder(strategies,this))
            .payloadDecoder(PayloadDecoder.ZERO_COPY);
          })