Search code examples
rabbitmqspring-cloud-streamspring-rabbitdead-letterpoison-queue

How to ignore maxAttempts & send message to DLQ?


I am using RabbitMQ. The consumer-api is using spring-cloud-steam. Currently, as maxAttempts is 3, if the consumer fails to process the message, then it will be queued again. And this will happen 3 times. If 3rd time also message fails, then it will be sent to DLX->DLQ.

as per below image Existing Rabbitmq exchange-queue design

Now I want to skip retrying if some specificException occurs in the listener side. How to skip this retry and send the message to DLX->DLQ directly for specificException?

Below is the configuration file.

spring:
  application:
    name: consumer-api
  cloud:
    stream:
      overrideCloudConnectors: true
      bindings:
        test_channel:
          destination: destinationName
          contentType: application/json
          group: groupName
          consumer:
            maxAttempts: 3
      rabbit:
        bindings:
          test_channel:
            consumer:
              durableSubscription: true
              exchangeType: direct
              bindingRoutingKey: do.test
              autoBindDlq: true
              deadLetterExchange: destinationName.dlx
              deadLetterQueueName: destinationName.dlx.groupName.dlq
              deadLetterRoutingKey: do.test

Below is the Listener code.

@Slf4j
@Component
public class groupNameListener {

    @StreamListener(TEST_CHANNEL)
    public void doTest(Message<DoTestMessage> doTestMessage) {
        log.info("doTest message received: {}", doTestMessage);

        try {
            if ("someCondition".equals(doTestMessage.getPayload().getMessage())) {
                throw new SpecificException();
            } else {
                log.info("do normal work");
                throw new Exception(); //if something fails
            }
        } catch (SpecificException specificException) {
            log.error("Don't requeue but send to DLQ.... how  ????");
        } catch (Exception ex) {
            log.error("Error in processing do test message");
            throw new AmqpRejectAndDontRequeueException("Reject and Don't requeue exception");
            //this will requeue the message maximum 3 times. If maxAttempts has reached then will be send to DLX->DLQ
        }
    }
}

Could someone please help me? Please let me know if I'm making any mistakes.


Solution

  • The upcoming 2.1 release adds the feature to specify which exceptions are, or are not, retryable.

    2.1.0.M3 (milestone) is available now.

    Exceptions that are not retryable will go directly to the DLQ.