I am trying to configure a spring-cloud-stream application with a rabbit binder
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5
Here under is my configuration :
spring:
cloud:
stream:
rabbit:
bindings:
listen-in-0:
consumer:
queueNameGroupOnly: true
enable-batching: true
batch-size: 100
receive-timeout: 500
transacted: true
listen-out-0:
producer:
queueNameGroupOnly: true
transacted: true
batchingEnabled: false
enable-batching: false
bindings:
listen-in-0:
destination: test.request
group: test.request
consumer:
batch-mode: true
requiredGroups: test.request
maxAttempts: 1
listen-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
My java code for consumer :
@Bean
public Function<Message<List<Request>>, List<Message<Response>>> listen() {
...
}
}
When no error occurs all works fine. But when i simulate an exception, i got the here under exception :
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy [] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer [] - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1500(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder$2.handleMessage(RabbitMessageChannelBinder.java:663)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
... 21 common frames omitted
This error is raised when rabbit binder tries to send back messages to DLQ.
Indeed, the ErrorMessage payload contains a List<Message<?>>
or spring cloud stream rabbit expect a Message<List<?>>
.
Is there a workaround or do i make an error in my configuration or usage of the library ?
Thanks.
EDIT 1 :
Ok so i cannot use the DLQ for batch message listener with spring-cloud-stream-rabbit-binder.
I have to handle it myself, so how can i ensure transaction between all functions composing my listene :
But to handle exception myself i have to handle execptions in all the function composing my listener. I have a listener composed of multi functions with this configuration :
spring:
cloud:
function:
definition: heartbeat;listen|process|send
To handle transaction in heartbeat
function it's ok, but how can i ensure transaction between all functions composing listen|process|send
?
Actually i have the same issue when i use StreamBridge
in the process
function to send message. If the send function fails i dont want the messages sent in the 'process` function to be commited.
Here is my java code for process function :
@Configuration
public class WorkerProcessor {
@Bean
public Function<List<Request>, List<Request>> process(Service service) {
return (requests) -> service.run(requests);
}
}
@Component
@Transactional
public class Service {
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public List<Request> run(List<Request> requests) {
requests.forEach(request -> {
worker.process(request, bridge);
});
return requests;
}
}
Dead letter re-publishing in the binder is not currently supported for batch listeners. Set republishToDlq
to false.
It's hard to say whether it should be supported (i.e. the entire batch would be sent, even if some records are processed successfully), but it should not throw a ClassCastException
.
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348
With a batch listener, it's best to handle exceptions in the listener itself.