When using the ServicebusSessionReceiverAsyncClient to receive a single message from a Service Bus Queue, an IllegalStateException is thrown. The message mentions trying to add credits to an already closed connection.
I'm using take(1) and next() to transform the Flux into a single result Mono. The documentation says that using take(1) on the stream will close the stream after the first result, which is what I'm looking to do.
My receiver code:
private <T extends IWocTransaction> Mono<Optional<T>> responseAsync(String transactionId, Class<T> clazz) {
var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
.sessionReceiver()
.queueName("my-callback-queue")
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.buildAsyncClient();
var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
receiver -> receiver.receiveMessages(),
receiver -> Mono.fromRunnable(receiver::close)
);
return Mono.from(msgStream
.timeout(timeout)
.take(1)
.next()
).map(message -> {
var json = message.getBody().toString();
try {
var val = objectMapper.readValue(json, clazz);
return val != null ? Optional.of(val) : Optional.<T>empty();
} catch (Exception e) {
log.error("Error deserializing response from string {}", json, e);
return Optional.<T>empty();
}
})
.doOnError(t -> {
if (t instanceof TimeoutException) {
log.error("Timeout error waiting on API callback {}", kv("ApiTimeout", timeout.toString()), t);
} else {
log.error("Error waiting for async callback", t);
}
}).onErrorReturn(Optional.empty());
}
This code works fine but I'm getting this exception on every run:
13:46:27.122 [io-executor-thread-1] INFO c.a.m.s.ServiceBusClientBuilder - {"az.sdk.message":"Closing a dependent client.","numberOfOpenClients":1}
13:46:27.127 [receiver-0-1] INFO c.a.m.s.ServiceBusSessionReceiver - {"az.sdk.message":"There is no lock token.","sessionId":"adfadsr","messageId":"fb70e81e4d304b8fb34092440243554a"}
13:46:27.138 [receiver-0-1] INFO c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {"az.sdk.message":"Cannot add credits to closed link: adfadsr","exception":"Cannot add credits to closed link: adfadsr","connectionId":"MF_57a511_1680201985206","entityPath":"woc-callback-queue","linkName":"adfadsr"}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
**reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr**
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:227)
at com.azure.messaging.servicebus.ServiceBusSessionReceiver.lambda$new$2(ServiceBusSessionReceiver.java:92)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:447)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
13:46:27.194 [reactor-executor-1] INFO c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_57a511_1680201985206","errorCondition":null,"errorDescription":null,"linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.198 [reactor-executor-1] INFO c.a.c.a.i.ReactorSession - {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.199 [reactor-executor-1] INFO c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
How can I prevent the IllegalStateException from being thrown or at least handle it?
this IllegalStateException: Cannot add credits to closed link "should not be thrown" to the application but "should only be logged".
It sometimes happens because there are a couple of threads running concurrently. One is a non-blocking IO_thread (handling message frames, sending credit through flow frames), and the second is the Worker_thread that delivers the message to the application. There is a third one, the application's handler_thread, on which the app's responseAsync is invoked.
What happens is when the responseAsync closes the client from [any]_thread, it is still possible that the IO_thread in the background is doing some work while it receives the closure request. The error appears on the log when IO_thread is in the middle of sending flow frame when other parts of the client are getting shutdown. This log entry be ignored.
It looks like the application's design is to create and dispose of clients for every request. It means the application creates and closes TCP (to Service Bus) connections on every request, which can be heavy.