Search code examples
spring-cloud-stream

Spring cloud stream rabbit binder : Error sending batch messages to DLQ


I am trying to configure a spring-cloud-stream application with a rabbit binder

spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5

Here under is my configuration :

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          listen-in-0:
            consumer:
              queueNameGroupOnly: true
              enable-batching: true
              batch-size: 100
              receive-timeout: 500
              transacted: true
          listen-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
              batchingEnabled: false
              enable-batching: false
      bindings:
        listen-in-0:
          destination: test.request
          group: test.request
          consumer:
            batch-mode: true
            requiredGroups: test.request
            maxAttempts: 1
        listen-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response

My java code for consumer :

@Bean
    public Function<Message<List<Request>>, List<Message<Response>>> listen() {
      ...
    }
}

When no error occurs all works fine. But when i simulate an exception, i got the here under exception :

2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy []    - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer []    - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
    ... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1500(AmqpInboundChannelAdapter.java:69)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
    ... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder$2.handleMessage(RabbitMessageChannelBinder.java:663)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    ... 21 common frames omitted

This error is raised when rabbit binder tries to send back messages to DLQ.

Indeed, the ErrorMessage payload contains a List<Message<?>> or spring cloud stream rabbit expect a Message<List<?>>.

Is there a workaround or do i make an error in my configuration or usage of the library ?

Thanks.

EDIT 1 :

Ok so i cannot use the DLQ for batch message listener with spring-cloud-stream-rabbit-binder.

I have to handle it myself, so how can i ensure transaction between all functions composing my listene :

But to handle exception myself i have to handle execptions in all the function composing my listener. I have a listener composed of multi functions with this configuration :

spring:
  cloud:
    function:
      definition: heartbeat;listen|process|send

To handle transaction in heartbeat function it's ok, but how can i ensure transaction between all functions composing listen|process|send ?

Actually i have the same issue when i use StreamBridge in the process function to send message. If the send function fails i dont want the messages sent in the 'process` function to be commited.

Here is my java code for process function :


@Configuration
public class WorkerProcessor {

    @Bean
    public Function<List<Request>, List<Request>> process(Service service) {
        return (requests) -> service.run(requests);
    }
}
@Component
@Transactional
public class Service {
    StreamBridge bridge;
    IWorker worker;
    public Service(StreamBridge bridge, IWorker worker) {
        this.bridge = bridge;
        this.worker = worker;
    }
    @Transactional
    public List<Request> run(List<Request> requests) {
        requests.forEach(request -> {
            worker.process(request, bridge);
        });
        return requests;
    }
}

Solution

  • Dead letter re-publishing in the binder is not currently supported for batch listeners. Set republishToDlq to false.

    It's hard to say whether it should be supported (i.e. the entire batch would be sent, even if some records are processed successfully), but it should not throw a ClassCastException.

    https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348

    With a batch listener, it's best to handle exceptions in the listener itself.