Search code examples
spring-integrationibm-mqspring-jms

IntegrationFlows having filter on Error Channel


I have the below code which accepts only the message of type of 'shipment'. Basically the MQ topic accepts two types of messages 'order' and 'shipment'.

Below logic works fine as long as I receive 'shipment' messages but it will throw an error through errorChannel if the topic receives an invalid 'order' payload since I have placed the filter after .errorChannel().

My question is, is there anyway I can place .filter() before the .errorChannel()? I do not care for other message types apart from 'shipment' type.

Thanks in advance.

@Bean
public IntegrationFlow jmsInboundFlow() throws Exception {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(messageListenerContainer)
        .errorChannel(inboundErrorChannel))
        .filter(Message.class, m -> m.getHeaders().get("message_type").equals("shipment"))
        .channel(routerChannel)
        .get();
}

EDIT: The errorChannel() works fine if the payload is null or invalid, but the problem here is it is throwing on other message type as well which I am not interested.

Error message I got on 'order' message type:

FailedMessage: GenericMessage [payload=<order></order>, headers={...}

org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0';

org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/win/managedinventorysubscriber/sicomponent/MsgDrivenChannelAdapter.class]'; from source: 'bean method jmsInboundFlow']; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=<order></order>, headers={JMS_IBM_Character_Set=IBM037, JMS_IBM_MsgType=8, sales_order_guid=421028b0-a09d-41e3-878c-f6ce0163e18d, sales_order_action_code=delete, jms_destination=topic:///WISE/ORDERSHIPMENT, sales_order_company_nbr=00028, JMSXUserID=qmqmuser    , JMS_IBM_Encoding=273, message_type=order, priority=0, jms_timestamp=1643727671960, JMSXAppID=T1WSERVICEQA                , JMS_IBM_PutApplType=26, JMS_IBM_Format=MQSTR   , jms_redelivered=false, JMS_IBM_PutDate=20220201, JMSXDeliveryCount=1, sales_order_nbr=007265, jms_correlationId=ID:414d5120543157534552564943455141a290cb6009df8425, JMS_IBM_PutTime=15011196, id=2ea81c40-ce4e-990c-aba7-961c4b464304, jms_messageId=ID:414d5120543157534552564943455141a290cb6000838d25, timestamp=1643727671913}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:428)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:509)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:345)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:744)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:704)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1165)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1062)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at com.win.managedinventorysubscriber.sicomponent.MsgDrivenChannelAdapter.lambda$jmsInboundFlow$0(MsgDrivenChannelAdapter.java:33)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:97)
    at org.springframework.integration.filter.AbstractMessageProcessingSelector.accept(AbstractMessageProcessingSelector.java:65)
    at org.springframework.integration.filter.MessageFilter.doHandleRequestMessage(MessageFilter.java:171)
    at org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler.handleRequestMessage(AbstractReplyProducingPostProcessingMessageHandler.java:50)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)

Solution

  • The filter by itself is enough if you just want to ignore.

    public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler
            implements DiscardingMessageHandler, ManageableLifecycle {
    
        private final MessageSelector selector;
    
        private boolean throwExceptionOnRejection;
    
        private MessageChannel discardChannel;
    
        private String discardChannelName;
    

    The selector is your function to decide on the incoming message.

    The throwExceptionOnRejection is false by default, so no exception is thrown is message does not satisfy the filter.

    The discardChannel is null by default, so no any rejection is sent when message does not satisfy the filter.

    Therefore the filter in your configuration is not a root cause of that exception. Please, share more info about an exception in your question.

    See its docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#filter