I'm trying to use Spring integration with RabbitMQ, using RabbitMQ backed Spring integration channels. (Which seems almost not documented for some reason, is this new?).
To do this, it seems I can use AmqpChannelFactoryBean to create a channel. To set up message conversion, I use a Jackson2JsonMessageConverter.
When I use a GenericMessage with a POJO payload, it refuses to de-serialize it from Java, basically because it doesn't know the type. I would have expected the type to be automagically be put on the header, but on the header there is only __TypeId__=org.springframework.messaging.support.GenericMessage
.
In Spring boot my configuration class looks like this:
@Configuration
public class IntegrationConfiguration {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
MessageConverter messageConverter) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("myActivateOut");
factoryBean.setPubSub(false);
factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
factoryBean.setMessageConverter(messageConverter);
return factoryBean;
}
@Bean
@ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
public MessageHandler mqttOutbound() {
return m -> System.out.println(m);
}
}
Sending is done like this:
private final MessageChannel myActivateOutChannel;
@Autowired
public MySender(MessageChannel myActivateOutChannel) {
this.myActivateOutChannel = myActivateOutChannel;
}
@Override
public void run(ApplicationArguments args) throws Exception {
MyPojo pojo = new MyPojo();
Message<MyPojo> msg = new GenericMessage<>(pojo);
myActivateOutChannel.send(msg);
}
If I set my own classmapper, things do work as they should. But I would have to use many MessageConverters if I set up things like that. E.g.
converter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class< ? > clazz, MessageProperties properties) {
}
@Override
public Class< ? > toClass(MessageProperties properties) {
return MyPojo.class;
}
});
Am I using this wrong? Am I missing some configuration? Any other suggestions?
Thanks!! :)
Note: Looking more at things, I'm guessing the 'Spring integration' way would be to add a Spring integration JSON transformer on each side, which means also adding two additional direct channels per RabbitMQ queue? This feels wrong to me, since I've got triple the channels then (6! for in/out), but mayby that's how the framework is supposed to be used? Couple all the simple steps with direct channels? (Do I keep the persistence which the RabbitMQ channels offer in that case? Or do I need some transaction mechanism if I want that? Or is it inherent in how direct channels work?)
I've also noticed now there's both a Spring-integration MessageConverter, and a Spring-amqp MessageConverter. The latter being the one I've used. Would the other work the way I want it to? A quick glance at the code suggests it doesn't store the object type in the message header?
Prior to version 4.3, amqp-backed channels only supported serializable payloads; the work around was to use channel adapters instead (which support mapping).
INT-3975 introduced a new property extractPayload
which causes the message headers to be mapped to rabbitmq headers and the message body is just the payload instead of a serialized GenericMessage
.
Setting extractPayload
to true should solve your problem.