The configuration class(part):
public static RabbitQueueConfig clubProNotAvailableConfig =
new RabbitQueueConfig("club-pro-not-available", "club-pro-not-available", "club-pro-not-available-status", "3-3");
@Bean
public SimpleMessageListenerContainer listenerContainer5(ClubProNotAvailableListener listener, ConnectionFactory connectionFactory) {
return initListenerContainer(listener, clubProNotAvailableConfig, connectionFactory);
}
private SimpleMessageListenerContainer initListenerContainer(
ChannelAwareMessageListener listener,
RabbitQueueConfig config,
ConnectionFactory connectionFactory
) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueueNames(config.getQueue());
listenerContainer.setMessageListener(listener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConcurrency(config.getThreadPoolSize());
listenerContainer.setPrefetchCount(1);
return listenerContainer;
}
Method of sending a message:
try {
success = clientRepository.updateAnketa(privatePersonProfile.getProfileId(), clubProAnketa, null);
} catch (ClubProNotAvailableException e) {
ClubProNotAvailableRabbit clubProNotAvailableRabbit = new ClubProNotAvailableRabbit();
clubProNotAvailableRabbit.setRequestContextRabbit(RequestContextRabbit.createContext(requestContextService.getContext()));
clubProNotAvailableRabbit.setCountRetry(0L);
clubProNotAvailableRabbit.setProfileId(privatePersonProfile.getProfileId());
clubProNotAvailableRabbit.setNameMethod(ChangeMethod.CHANGE_ANKETA);
clubProNotAvailableRabbit.setChangeAnketaData(anketa);
rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 10000);
return message;
}
});
throw new ClubProNotAvailableException();
}
Configuration in the broker:
Queue configuration:
configuration of the exchanger:
I've read the documentation, tried a couple of options, but I can't apply it to my code.
What am I doing wrong? I will be very grateful for your help.
It looks like you don't have the delayed exchange plugin; you have also declared the exchange as a simple fanout
; this is what the exchange should look like this:
Also, to set the delay when sending, you should use:
template.convertAndSend(exchangeName, queue.getName(), "foo", message -> {
message.getMessageProperties().setDelay(1000);
return message;
});