I'm running into trouble with GZip/GUnzip message processing when using the AsyncRabbitTemplate.
Things work fine with an synchronous template setup like so:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonConverter());
rabbitTemplate.setReplyTimeout(config.getRabbitSendAndReceiveReplyTimeout());
rabbitTemplate.setReceiveTimeout(config.getRabbitSendAndReceiveReceiveTimeout());
rabbitTemplate.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
rabbitTemplate.setBeforePublishPostProcessors(new GZipPostProcessor(true));
return rabbitTemplate;
}
However, when I setup an async template like this:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AsyncRabbitTemplate rabbitTemplateAsync(final ConnectionFactory connectionFactory) {
final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory));
// need to manually start the reply listener container for some reason
asyncRabbitTemplate.start();
return asyncRabbitTemplate;
}
The reply message isn't unzipped properly, and I get this error message
Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8
at java.lang.StringCoding.decode(Unknown Source) ~[?:1.8.0_192]
at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:235) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:199) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.AsyncRabbitTemplate.onMessage(AsyncRabbitTemplate.java:576) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
I've tried giving the AsyncRabbitTemplate a configured DirectReplyToMessageListenerContainer, but it doesn't help
final DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(
connectionFactory);
directReplyToMessageListenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
directReplyToMessageListenerContainer);
This just results in this error:
[ERROR] 2019-03-06 12:18:05.192 [AMQP Connection 172.17.3.6:5672] CachingConnectionFactory.log - Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - fast reply consumer does not exist, class-id=60, method-id=40)
Note that I was able to get things working by taking a branch of the spring-rabbit project and adding this constructor to AsyncRabbitTemplate:
public IndigoAsyncRabbitTemplate(final RabbitTemplate template,
final DirectReplyToMessageListenerContainer directReplyToContainer) {
Assert.notNull(template, "'template' cannot be null");
this.template = template;
container = null;
replyAddress = null;
this.directReplyToContainer = directReplyToContainer;
directReplyToContainer.setMessageListener(this);
}
So, is this going to take an enhancement to the spring rabbit library to get working? Or is there a way to get GUnzip working on the reply listener without jumping through too many hoops?
Right, this has to go as an improvement into the framework. We are just missing the fact of the afterReceivePostProcessors
in case of AsyncRabbitTemplate
. We can re-configure an internal DirectReplyToMessageListenerContainer
to use afterReceivePostProcessors
from the provided RabbitTemplate
.
Meanwhile you can stick with a regular SimpleMessageListenerContainer
injection.
Or you can try with an external DirectReplyToMessageListenerContainer
injection though.
See this ctor:
/**
* Construct an instance using the provided arguments. The first queue the container
* is configured to listen to will be used as the reply queue. Replies will be
* routed using the default exchange with that queue name as the routing key.
* @param template a {@link RabbitTemplate}
* @param container a {@link AbstractMessageListenerContainer}.
*/
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
this(template, container, null);
}
The issue on the matter: https://github.com/spring-projects/spring-amqp/issues/920