Search code examples
azureservicebusazure-servicebus-queuesazure-servicebus-subscriptions

Azure Service Bus - ServiceBusProcessorClient-Java- Trytimeout and processing concurrent sessions


I am using ServiceBusProcessorClient and AmqpRetryOptions to connect my Azure Service Bus. Facing two issues.

  1. Consumer waits for 30 seconds to process the next message from ASB. Is this due to trytimeout settings? I set to 30 seconds as once we get a message from ASB we call an API and that can sometimes take 30 seconds to respond.

  2. When I have concurrent sessions and max concurrent calls as 4, it will take 4 messages from ASB, but if one of them fails, in the exception block , I do a serviceBusProcessorClient.close() which will stop the consumer and in this case, it will put all 4 messages back in ASB, rather then just the failed one. Any solution available?

Code configuration below , values just put as string for simplicity

ampqRetryOptions.setDelay(Duration.ofSeconds(Integer.parseInt(serviceBusConfig.getAmpqDelay())));
        ampqRetryOptions.setMaxRetries(<5>);
        ampqRetryOptions.setMaxDelay(<1>);
        ampqRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
        ampqRetryOptions.setTryTimeout(<30 seconds>);
serviceBusProcessorClient = new ServiceBusClientBuilder()
                .connectionString(connectioString).retryOptions(ampqRetryOptions)
                .sessionProcessor().maxConcurrentSessions(4))
                .maxConcurrentCalls(4))
                .queueName(<queueName>)
                .maxAutoLockRenewDuration(50 seconds)
                .processMessage(onMessage()).disableAutoComplete()
                .processError(onError)
                .buildProcessorClient();


Solution

    1. Based on your current configuration, the session idle timeout is 30 seconds based on your retryOptions.getTryTimeout(). You can override this behaviour by setting the sessionIdleTimeout(Duration) when constructing the ServiceBusProcessorClient. The processor client is continuously pumping messages from the session when it acquires the session. How tryTimeout and sessionIdleTimeout play into this is that if no messages are received within the timeout duration, we release the session and go to fetch a new session.

    2. Calling ServiceBusProcessorClient.close() will close all the current sessions and stop the processor entirely. The solution depends on why processing the message failed, what your fallback is for these messages, etc. There are multiple ways to settle messages, you can choose to "Abandon", "Deadletter", or "Complete" them. Abandoning messages will send the message back to the queue for redelivery (in case some other consumer can process it) until the max delivery attempts are reached. Dead lettering the message will send the message to a dead letter queue. You need to enable this feature when creating your queue. For more information on these types of settlements, see Settling receive operations.