Search code examples
restkotlinspring-webfluxreactive

WebFlux call with WebClient returning IllegalStateException Only one connection receive subscriber allowed


I'm using Webflux and WebClient to talk to another API

suspend fun doSomething() : myResponse {
    return webClient
      .get()
      .uri {
        it.path("/some/path")
          .queryParam("activeOnly", true)
          .build()
      }
      .retrieve()
      .onStatus(
        { httpStatus -> HttpStatus.NOT_FOUND == httpStatus },
        { throw CustomException("my message") }
      )
      .bodyToMono(responseType)
      .timeout(Duration.ofSeconds(timeout))
      .onErrorResume { ex ->
        when (ex) {
          is CustomException-> Mono.fromCallable { emptyList<MyResponse>() }
          else -> Mono.error(ex)
        }
      }
      .doOnError { ex ->
        handleTimeoutException(
          exception = ex,
          endPoint = "some endpoint"
        )
      }
}

suspend fun doSomethingElse() : anotherResponse { .. } // calls a different endpoint

Both functions share the same WebClient and are called in another class -

val foo = myClientClass.doSomething().awaitFirst()
val bar = myClientClass.doSomethingElse().awaitFirst()

I am not explictly using .subscribe()anywhere

I'm seeing this in my logs -

IllegalStateException Only one connection receive subscriber allowed: Only one connection receive subscriber allowed.', hint: 'null' |
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext

I don't believe I'm resubscribing to the response body ... correct me if I'm wrong. Is there anything I'm doing wrong in my get call that would cause this?


Solution

  • The Only one connection receive subscriber allowed is the key here.
    In my code, if I had:

    .onStatus
    
    .bodyToMono
    
    .onErrorResume
    

    I found that meant that more subscribers were trying to receive the response, thus causing the problem.
    Try to simplify everything by switching to kotlin coroutines, you will see the error go away:

    suspend fun doSomething() : myResponse {
        return webClient
          .get()
          .uri {
            it.path("/some/path")
              .queryParam("activeOnly", true)
              .build()
          }
          .awaitExchange { clientResponse ->
            when (clientResponse.statusCode()) {
                NOT_FOUND -> emptyList<MyResponse>()
                OK -> clientResponse.awaitBody<List<MyResponse>>()
                else -> throw clientResponse.createExceptionAndAwait()
            }
        }
    

    Timeout is not part of your actual problem here, so I did not include it in the answer, but if you want to handle it, you can use a simple flow.

    To make this work, check you have the follow dependencies: kotlinx-coroutines-core and kotlinx-coroutines-reactor.