Search code examples
springspring-bootspring-jmsspring-messaging

Custom MessageConverter with Spring JmsMessagingTemplate is not working as I expected


I'm trying to attach a custom message converter that implements org.springframework.jms.support.converter.MessageConverter, to a JmsMessagingTemplate.

I've read somewhere that we can attach the message converter to a MessagingMessageConverter by calling setPayloadConverter, and then attach that messaging message converter to the JmsMessagingTemplate via setJmsMessageConverter. After that, I call convertAndSend, but I notice that it doesn't convert the payload.

When I debugged the code, I notice that setting Jms Message Converter doesn't set the converter instance variable in the JmsMessagingTemplate. So when the convertAndSend method calls doConvert and tries to getConverter, it is getting the default simple message converter and not my custom one.

My question is, can I use an implementation of org.springframework.jms.support.converter.MessageConverter with a JmsMessagingTemplate? Or do I need to use an implementation of org.springframework.messaging.converter.MessageConverter?

I'm using Spring Boot 1.4.1.RELEASE, and Spring 4.3.3.RELEASE. The code is below.

Configuration

@Configuration
@EnableJms
public class MessagingEncryptionPocConfig {
    /**
     * Listener ActiveMQ Connection Factory
     */
    @Bean(name="listenerActiveMqConnectionFactory")
    public ActiveMQConnectionFactory listenerActiveMqConnectionFactory() {
        return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
    }

    /**
     * Producer ActiveMQ Connection Factory
     */
    @Bean(name="producerActiveMqConnectionFactory")
    public ActiveMQConnectionFactory producerActiveMqConnectionFactory() {
        return new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
    }   

    /**
     * Caching Connection Factory
     */
    @Bean
    public CachingConnectionFactory cachingConnectionFactory(@Qualifier("producerActiveMqConnectionFactory") ActiveMQConnectionFactory activeMqConnectionFactory) {
        return new CachingConnectionFactory(activeMqConnectionFactory);
    }

    /**
     * JMS Listener Container Factory
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Qualifier("listenerActiveMqConnectionFactory") ActiveMQConnectionFactory connectionFactory, MessagingMessageConverter messageConverter) {
        DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
        defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
        defaultJmsListenerContainerFactory.setMessageConverter(messageConverter);
        return defaultJmsListenerContainerFactory;
    }

    /**
     * Jms Queue Template
     */
    @Bean(name="queueTemplate")
    public JmsMessagingTemplate queueTemplate(CachingConnectionFactory cachingConnectionFactory, MessageConverter messagingMessageConverter) {
        JmsMessagingTemplate queueTemplate = new JmsMessagingTemplate(cachingConnectionFactory);
        queueTemplate.setJmsMessageConverter(messagingMessageConverter);
        return queueTemplate;
    }

    @Bean
    public MessageConverter encryptionDecryptionMessagingConverter(Jaxb2Marshaller jaxb2Marshaller) {
        MessageConverter encryptionDecryptionMessagingConverter = new EncryptionDecryptionMessagingConverter(jaxb2Marshaller);
        MessagingMessageConverter messageConverter = new MessagingMessageConverter();
        messageConverter.setPayloadConverter(encryptionDecryptionMessagingConverter);
        return messageConverter;
    }

    /**
     * Jaxb marshaller
     */
    @Bean(name="producerJaxb2Marshaller")
    public Jaxb2Marshaller jaxb2Marshaller() {
        Jaxb2Marshaller jaxb2Marshaller = new Jaxb2Marshaller();
        jaxb2Marshaller.setPackagesToScan("com.schema");
        return jaxb2Marshaller;
    }
}

MessageProducer Class

@Component 
public class MessageProducer {

    private static final Logger LOG = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired
    @Qualifier("queueTemplate")
    private JmsMessagingTemplate queueTemplate;

    public void publishMsg(Transaction trx, Map<String,Object> jmsHeaders, MessagePostProcessor postProcessor) {
        LOG.info("Sending Message. Payload={} Headers={}",trx,jmsHeaders);
        queueTemplate.convertAndSend("queue.source", trx, jmsHeaders, postProcessor);
    }
}

Unit Test

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class WebsMessagingEncryptionPocApplicationTests {
    @Autowired
    private MessageProducer producer;

    @Autowired
    private MessageListener messageListener;    

    /**
     * Ensure that a message is sent, and received.
     */
    @Test
    public void testProducer() throws Exception{
        //ARRANGE
        CountDownLatch latch = new CountDownLatch(1);
        messageListener.setCountDownLatch(latch);
        Transaction trx = new Transaction();
        trx.setCustomerAccountID(new BigInteger("111111"));
        Map<String,Object> jmsHeaders = new HashMap<String,Object>();
        jmsHeaders.put("tid", "1234563423");
        MessagePostProcessor encryptPostProcessor = new EncryptMessagePostProcessor();
        //ACT
        producer.publishMsg(trx, jmsHeaders, encryptPostProcessor);
        latch.await();
        //ASSERT - assertion done in the consumer       
    }
}

Solution

  • The converter field is used to convert your input params to a spring-messaging Message<?>.

    The JMS converter is used later (in MessagingMessageCreator) to then create a JMS Message from the messaging Message<?>.