Search code examples
javaapache-camel

Why does Camel expect to deserialize IN message POJO while I want to change response POJO; no destination set, only JMSReplyTo present


I have a Camel route which listens to an ActiveMQ queue, do some processing, and sends the result to a JMSReplyTo header queue as response.

In previous version of the app, the destination needs to be explicitly set and it was sending the message with a ProducerTemplate.sendBody(); but I read in Camel doc that JMSReplyTo will be respected. So I want to try set no destination; however, seems that Camel expects the body to be the incoming message POJO.

How can I send the modified body?

https://camel.apache.org/components/3.19.x/eips/requestReply-eip.html has an illustration, where the request and reply have different color. I suppose they are different message, which makes sense; who wants the original message?

Camel-EIP-request-reply-illustration

In the last step of my route I did: exchange.getIn().setBody(myString);; also tried exchange.getMessage().setBody(myString); but I got error:

Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: java.lang.String to the required type: com.example.IncomingMessage with value {"failedEndpointsAndCauses":{},"message":"Message does not contain some ID field","status":"ERROR"}

which is exactly what I want to put into the response. IncomingMessage is the POJO coming in in the first place from ActiveMQ. Seems that Camel expects my outgoing response POJO to be the same type as incoming message.

I cannot getOut().setBody(), as this is deprecated. I don't know why(maybe I know, that it's better just to getIn() and setBody(), because we need to copy everything from IN to OUT; but in the annotation of @Deprecated there are no explanations. )

Similar question: How exactly is JMSReplyTo handled by Apache Camel? When does camel implicitly utilises the destination? which suggests set disableReplyTo. But why Camel is not following the EIP of Request and Reply? I want a different body as response.

Complete stacktrace(I am trying to return the POJO to toD() and marshall() but similar error, just now it's not string but POJO not matching the expected type):

2022-12-20 14:09:49,293 DEBUG [org.apa.cam.com.jms.DefaultJmsMessageListenerContainer] (Camel (camel-1) thread #6 - JmsConsumer[my.queue]) Initiating transaction rollback on application exception: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
    at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
    at org.apache.camel.support.builder.ExpressionBuilder$33.evaluate(ExpressionBuilder.java:1030)
    at org.apache.camel.support.ExpressionAdapter.evaluate(ExpressionAdapter.java:45)
    at org.apache.camel.component.bean.MethodInfo$ParameterExpression.evaluateParameterBinding(MethodInfo.java:738)
    at org.apache.camel.component.bean.MethodInfo$ParameterExpression.evaluateParameterExpressions(MethodInfo.java:624)
    at org.apache.camel.component.bean.MethodInfo$ParameterExpression.evaluate(MethodInfo.java:592)
    at org.apache.camel.component.bean.MethodInfo.initializeArguments(MethodInfo.java:263)
    at org.apache.camel.component.bean.MethodInfo.createMethodInvocation(MethodInfo.java:271)
    at org.apache.camel.component.bean.BeanInfo.createInvocation(BeanInfo.java:277)
    at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:126)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:81)
    at org.apache.camel.processor.errorhandler.NoErrorHandler.process(NoErrorHandler.java:47)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
    at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:132)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: com.example.IncomingMessage but has value: OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={}) of type: com.example.OutboundMessage on: Message[ID:foo-40805-1671538168705-4:1:1:4:1]. Caused by: No type converter available to convert from type: com.example.OutboundMessage to the required type: com.example.IncomingMessage with value OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={}). Exchange[]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.example.OutboundMessage to the required type: com.example.IncomingMessage with value OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={})]
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125)
    at org.apache.camel.support.builder.ExpressionBuilder$33.evaluate(ExpressionBuilder.java:1028)
    ... 30 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.example.OutboundMessage to the required type: com.example.IncomingMessage with value OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={})
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:275)
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123)
    ... 31 more
2022-12-20 14:09:49,298 DEBUG [org.apa.cam.com.jms.DefaultJmsMessageListenerContainer] (Camel (camel-1) thread #6 - JmsConsumer[my.queue]) Rolling back transaction because of listener exception thrown: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
2022-12-20 14:09:49,299 WARN  [org.apa.cam.com.jms.EndpointMessageListener] (Camel (camel-1) thread #6 - JmsConsumer[my.queue]) Execution of JMS message listener failed. Caused by: [org.apache.camel.CamelExecutionException - Exception occurred during execution on the exchange: Exchange[]]: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
    at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
    at org.apache.camel.support.builder.ExpressionBuilder$33.evaluate(ExpressionBuilder.java:1030)
    at org.apache.camel.support.ExpressionAdapter.evaluate(ExpressionAdapter.java:45)
    at org.apache.camel.component.bean.MethodInfo$ParameterExpression.evaluateParameterBinding(MethodInfo.java:738)
    at org.apache.camel.component.bean.MethodInfo$ParameterExpression.evaluateParameterExpressions(MethodInfo.java:624)
    at org.apache.camel.component.bean.MethodInfo$ParameterExpression.evaluate(MethodInfo.java:592)
    at org.apache.camel.component.bean.MethodInfo.initializeArguments(MethodInfo.java:263)
    at org.apache.camel.component.bean.MethodInfo.createMethodInvocation(MethodInfo.java:271)
    at org.apache.camel.component.bean.BeanInfo.createInvocation(BeanInfo.java:277)
    at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:126)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:81)
    at org.apache.camel.processor.errorhandler.NoErrorHandler.process(NoErrorHandler.java:47)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
    at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:132)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: com.example.IncomingMessage but has value: OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={}) of type: com.example.OutboundMessage on: Message[ID:foo-40805-1671538168705-4:1:1:4:1]. Caused by: No type converter available to convert from type: com.example.OutboundMessage to the required type: com.example.IncomingMessage with value OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={}). Exchange[]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.example.OutboundMessage to the required type: com.example.IncomingMessage with value OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={})]
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125)
    at org.apache.camel.support.builder.ExpressionBuilder$33.evaluate(ExpressionBuilder.java:1028)
    ... 30 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.example.OutboundMessage to the required type: com.example.IncomingMessage with value OutboundMessage(jobResourceId=null, status=something, message=Message does not contain some ID, failedEndpointsAndCauses={})
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:275)
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123)
    ... 31 more
2022-12-20 14:09:49,873 SEVERE [org.ecl.yas.int.Unmarshaller] (awaitility-thread) Unexpected char 111 at (line no=1, column no=2, offset=1), expecting 'u'

Solution

  • OK I fixed it myself. There are 2 things:

    1. I missed one part in the middle where some validation fails and I do an early return and send, and that bean should set the IN body to be the same POJO to return, the OutboundMessage class, which is not done. So I have to do exchange.getIn().setBody(xxx) there, so that when going to the reply route, the message type is the same as the normal, validation OK path. I also have to .marshal().json(JsonLibrary.Jsonb, OutboundMessage.class) before sending the reply to make sure the message body is a valid string message.

    2. But then, a similar error happens where it is like Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: byte[] to the required type: com.example.IncomingMessage with value [Bxxxxxxx. I notice that it is now a byte array, meaning that it is already marshalled correctly, but Camel continues to try to process it even after sending. So, after trying all possible, I add .stop() after sending, and this error is gone. Notice that .stop() also ensures the callback(sending reply to reply queue in JMSReplyTo header) is executed, so you will always want to call stop() in request/reply mode.

    So, make sure you:

    • return the same POJO at the end of each logic path/branch, especially after a short circuit/premature return one
    • stop the route at the end, to stop Camel to do further conversion. In my case, this error is only shown in logs, and ActiveMQ side already received the message, i.e. does not affect the business flow, so it's kind of hard to detect on client side. But it should be stopped.