Search code examples
springspring-bootjmsspring-jmsspring-io

validate raw message against schema for method annotated with jmslistener


I have a need to apply some pre-checks and common steps on the all the jms listeners like validating the raw message against a schema (JSON schema). Example -

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order) { ... }
}

Now, before the spring converts the Message from the queue to Order, I need to do the following -

  1. log the original message with headers into a custom logger.
  2. validate the json message (text message) against a json schema (lets assume I have only one schema here for the simplicity)
  3. If schema validation fail, log the error and throw exception
  4. If schema validation passes, continue to control to spring to do the conversion and continue with the process order method.

Does the spring JMS architecture provides any way to inject the above need? I know AOP crosses the mind, but I am not sure will it work with @JmsListener.


Solution

  • A rather simple technique would be to set autoStartup to false on the listener container factory.

    Then, use the JmsListenerEndpointRegistry bean to get the listener container.

    Then getMessageListener(), wrap it in an AOP proxy and setMessageListener().

    Then start the container.

    There might be a more elegant way, but I think you'd have to get into the guts of the listener creation code, which is quite involved.

    EDIT

    Example with Spring Boot:

    @SpringBootApplication
    public class So49682934Application {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        public static void main(String[] args) {
            SpringApplication.run(So49682934Application.class, args);
        }
    
        @JmsListener(id = "listener1", destination = "so49682934")
        public void listen(Foo foo) {
            logger.info(foo.toString());
        }
    
        @Bean
        public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
            return args -> {
                DefaultMessageListenerContainer container =
                        (DefaultMessageListenerContainer) registry.getListenerContainer("listener1");
                Object listener = container.getMessageListener();
                ProxyFactory pf = new ProxyFactory(listener);
                NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
                advisor.addMethodName("onMessage");
                pf.addAdvisor(advisor);
                container.setMessageListener(pf.getProxy());
                registry.start();
                Thread.sleep(5_000);
                Foo foo = new Foo("baz");
                template.convertAndSend("so49682934", foo);
            };
        }
    
        @Bean
        public MessageConverter converter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("typeId");
            return converter;
        }
    
        public static class MyJmsInterceptor implements MethodInterceptor {
    
            private final Logger logger = LoggerFactory.getLogger(getClass());
    
            @Override
            public Object invoke(MethodInvocation invocation) throws Throwable {
                Message message = (Message) invocation.getArguments()[0];
                logger.info(message.toString());
                // validate
                return invocation.proceed();
            }
    
        }
    
        public static class Foo {
    
            private String bar;
    
            public Foo() {
                super();
            }
    
            public Foo(String bar) {
                this.bar = bar;
            }
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    

    and

    spring.jms.listener.auto-startup=false
    

    and

    m2018-04-06 11:42:04.859 INFO 59745 --- [enerContainer-1] e.So49682934Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-60138-1523029319662-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-60138-1523029319662-4:2:1:1, destination = queue://so49682934, transactionId = null, expiration = 0, timestamp = 1523029324849, arrival = 0, brokerInTime = 1523029324849, brokerOutTime = 1523029324853, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So49682934Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}

    2018-04-06 11:42:04.882 INFO 59745 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$e29327b8 : Foo [bar=baz]

    EDIT2

    Here's how to do it via infrastructure...

    @SpringBootApplication
    @EnableJms
    public class So496829341Application {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        public static void main(String[] args) {
            SpringApplication.run(So496829341Application.class, args);
        }
    
        @JmsListener(id = "listen1", destination="so496829341")
        public void listen(Foo foo) {
            logger.info(foo.toString());
        }
    
        @Bean
        public ApplicationRunner runner(JmsTemplate template) {
            return args -> {
                Thread.sleep(5_000);
                template.convertAndSend("so496829341", new Foo("baz"));
            };
        }
    
        @Bean
        public MessageConverter converter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("typeId");
            return converter;
        }
    
        @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
        public static JmsListenerAnnotationBeanPostProcessor bpp() {
            return new JmsListenerAnnotationBeanPostProcessor() {
    
                @Override
                protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                    return new MethodJmsListenerEndpoint() {
    
                        @Override
                        protected MessagingMessageListenerAdapter createMessageListener(
                                MessageListenerContainer container) {
                            MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                            ProxyFactory pf = new ProxyFactory(listener);
                            pf.setProxyTargetClass(true);
                            NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
                            advisor.addMethodName("onMessage");
                            pf.addAdvisor(advisor);
                            return (MessagingMessageListenerAdapter) pf.getProxy();
                        }
    
                    };
                }
    
            };
        }
    
        public static class MyJmsInterceptor implements MethodInterceptor {
    
            private final Logger logger = LoggerFactory.getLogger(getClass());
    
            @Override
            public Object invoke(MethodInvocation invocation) throws Throwable {
                Message message = (Message) invocation.getArguments()[0];
                logger.info(message.toString());
                // validate
                return invocation.proceed();
            }
    
        }
    
        public static class Foo {
    
            private String bar;
    
            public Foo() {
                super();
            }
    
            public Foo(String bar) {
                this.bar = bar;
            }
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    

    Note: the BPP must be static and @EnableJms is required since the presence of this BPP disables boot's.

    2018-04-06 13:44:41.607 INFO 82669 --- [enerContainer-1] .So496829341Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-63685-1523036676402-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-63685-1523036676402-4:2:1:1, destination = queue://so496829341, transactionId = null, expiration = 0, timestamp = 1523036681598, arrival = 0, brokerInTime = 1523036681598, brokerOutTime = 1523036681602, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So496829341Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}

    2018-04-06 13:44:41.634 INFO 82669 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$9ff4b13f : Foo [bar=baz]

    EDIT3

    Avoiding AOP...

    @SpringBootApplication
    @EnableJms
    public class So496829341Application {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        public static void main(String[] args) {
            SpringApplication.run(So496829341Application.class, args);
        }
    
        @JmsListener(id = "listen1", destination="so496829341")
        public void listen(Foo foo) {
            logger.info(foo.toString());
        }
    
        @Bean
        public ApplicationRunner runner(JmsTemplate template) {
            return args -> {
                Thread.sleep(5_000);
                template.convertAndSend("so496829341", new Foo("baz"));
            };
        }
    
        @Bean
        public MessageConverter converter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("typeId");
            return converter;
        }
    
        @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
        public static JmsListenerAnnotationBeanPostProcessor bpp() {
            return new JmsListenerAnnotationBeanPostProcessor() {
    
                @Override
                protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                    return new MethodJmsListenerEndpoint() {
    
                        @Override
                        protected MessagingMessageListenerAdapter createMessageListener(
                                MessageListenerContainer container) {
                            final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                            return new MessagingMessageListenerAdapter() {
    
                                @Override
                                public void onMessage(Message jmsMessage, Session session) throws JMSException {
                                    logger.info(jmsMessage.toString());
                                    // validate
                                    listener.onMessage(jmsMessage, session);
                                }
    
                            };
                        }
    
                    };
                }
    
            };
        }
    
        public static class Foo {
    
            private String bar;
    
            public Foo() {
                super();
            }
    
            public Foo(String bar) {
                this.bar = bar;
            }
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    

    EDIT4

    To access other annotations on the listener method, it can be done, but reflection is needed to get a reference to the Method...

    @JmsListener(id = "listen1", destination="so496829341")
    @Schema("foo.bar")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }
    
    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    @Documented
    public @interface Schema {
    
        String value();
    
    }
    
    @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    public static JmsListenerAnnotationBeanPostProcessor bpp() {
        return new JmsListenerAnnotationBeanPostProcessor() {
    
            @Override
            protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                return new MethodJmsListenerEndpoint() {
    
                    @Override
                    protected MessagingMessageListenerAdapter createMessageListener(
                            MessageListenerContainer container) {
                        final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                        InvocableHandlerMethod handlerMethod =
                                (InvocableHandlerMethod) new DirectFieldAccessor(listener)
                                        .getPropertyValue("handlerMethod");
                        final Schema schema = AnnotationUtils.getAnnotation(handlerMethod.getMethod(), Schema.class);
                        return new MessagingMessageListenerAdapter() {
    
                            @Override
                            public void onMessage(Message jmsMessage, Session session) throws JMSException {
                                logger.info(jmsMessage.toString());
                                logger.info(schema.value());
                                // validate
                                listener.onMessage(jmsMessage, session);
                            }
    
                        };
                    }
    
                };
            }
    
        };
    }