Search code examples
spring-integrationspring-amqpspring-integration-amqp

Why does a AmqpChannelFactoryBean with Jackson2JsonMessageConverter not store type?


I'm trying to use Spring integration with RabbitMQ, using RabbitMQ backed Spring integration channels. (Which seems almost not documented for some reason, is this new?).

To do this, it seems I can use AmqpChannelFactoryBean to create a channel. To set up message conversion, I use a Jackson2JsonMessageConverter.

When I use a GenericMessage with a POJO payload, it refuses to de-serialize it from Java, basically because it doesn't know the type. I would have expected the type to be automagically be put on the header, but on the header there is only __TypeId__=org.springframework.messaging.support.GenericMessage.

In Spring boot my configuration class looks like this:

@Configuration
public class IntegrationConfiguration {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
        MessageConverter messageConverter) {

        AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
        factoryBean.setConnectionFactory(connectionFactory);
        factoryBean.setQueueName("myActivateOut");
        factoryBean.setPubSub(false);
        factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
        factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
        factoryBean.setMessageConverter(messageConverter);
        return factoryBean;
    }

    @Bean
    @ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
    public MessageHandler mqttOutbound() {

        return m -> System.out.println(m);
    }

}

Sending is done like this:

private final MessageChannel myActivateOutChannel;

@Autowired
public MySender(MessageChannel myActivateOutChannel) {
    this.myActivateOutChannel = myActivateOutChannel;
}

@Override
public void run(ApplicationArguments args) throws Exception {
    MyPojo pojo = new MyPojo();
    Message<MyPojo> msg = new GenericMessage<>(pojo);

    myActivateOutChannel.send(msg);
}

If I set my own classmapper, things do work as they should. But I would have to use many MessageConverters if I set up things like that. E.g.

    converter.setClassMapper(new ClassMapper() {

        @Override
        public void fromClass(Class< ? > clazz, MessageProperties properties) {
        }

        @Override
        public Class< ? > toClass(MessageProperties properties) {
            return MyPojo.class;
        }

    });

Am I using this wrong? Am I missing some configuration? Any other suggestions?

Thanks!! :)

Note: Looking more at things, I'm guessing the 'Spring integration' way would be to add a Spring integration JSON transformer on each side, which means also adding two additional direct channels per RabbitMQ queue? This feels wrong to me, since I've got triple the channels then (6! for in/out), but mayby that's how the framework is supposed to be used? Couple all the simple steps with direct channels? (Do I keep the persistence which the RabbitMQ channels offer in that case? Or do I need some transaction mechanism if I want that? Or is it inherent in how direct channels work?)

I've also noticed now there's both a Spring-integration MessageConverter, and a Spring-amqp MessageConverter. The latter being the one I've used. Would the other work the way I want it to? A quick glance at the code suggests it doesn't store the object type in the message header?


Solution

  • Prior to version 4.3, amqp-backed channels only supported serializable payloads; the work around was to use channel adapters instead (which support mapping).

    INT-3975 introduced a new property extractPayload which causes the message headers to be mapped to rabbitmq headers and the message body is just the payload instead of a serialized GenericMessage.

    Setting extractPayload to true should solve your problem.