Search code examples
javaspringrabbitmqspring-amqp

I can't organize delayed sending of messages with spring amp quot


The configuration class(part):

 public static RabbitQueueConfig clubProNotAvailableConfig =
  new RabbitQueueConfig("club-pro-not-available", "club-pro-not-available", "club-pro-not-available-status", "3-3");

    @Bean
    public SimpleMessageListenerContainer listenerContainer5(ClubProNotAvailableListener listener, ConnectionFactory connectionFactory) {
        return initListenerContainer(listener, clubProNotAvailableConfig, connectionFactory);
    }


    private SimpleMessageListenerContainer initListenerContainer(
            ChannelAwareMessageListener listener,
            RabbitQueueConfig config,
            ConnectionFactory connectionFactory
    ) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setQueueNames(config.getQueue());
        listenerContainer.setMessageListener(listener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConcurrency(config.getThreadPoolSize());
        listenerContainer.setPrefetchCount(1);
        return listenerContainer;
    }

Method of sending a message:

   try {
        success = clientRepository.updateAnketa(privatePersonProfile.getProfileId(), clubProAnketa, null);
        } catch (ClubProNotAvailableException e) {


            ClubProNotAvailableRabbit clubProNotAvailableRabbit = new ClubProNotAvailableRabbit();
            clubProNotAvailableRabbit.setRequestContextRabbit(RequestContextRabbit.createContext(requestContextService.getContext()));
            clubProNotAvailableRabbit.setCountRetry(0L);
            clubProNotAvailableRabbit.setProfileId(privatePersonProfile.getProfileId());
            clubProNotAvailableRabbit.setNameMethod(ChangeMethod.CHANGE_ANKETA);
            clubProNotAvailableRabbit.setChangeAnketaData(anketa);

   rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setHeader("x-delay", 10000);
                    return message;
                }
            });

            throw new ClubProNotAvailableException();
        }

Configuration in the broker:

Queue configuration:

configuration of the exchanger:

I've read the documentation, tried a couple of options, but I can't apply it to my code.

What am I doing wrong? I will be very grateful for your help.


Solution

  • It looks like you don't have the delayed exchange plugin; you have also declared the exchange as a simple fanout; this is what the exchange should look like this:

    enter image description here

    Also, to set the delay when sending, you should use:

            template.convertAndSend(exchangeName, queue.getName(), "foo", message -> {
                message.getMessageProperties().setDelay(1000);
                return message;
            });