Search code examples
springspring-bootrabbitmqspring-rabbitrabbitmq-exchange

no fix order of messages being delivered to Rabbitmq Server in multithreading Environment


please have a look into my code first.

This is my test class which is creating 2000 of threads and those threads are sending messages.

public class MessageSenderMultipleThreadMock {
    @Autowired
    MessageList message;
    @Autowired
    MessageSender sender;

    public boolean process() throws InterruptedException {

        for (int i = 0; i < 2000; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {

                    String routingkey = "operation"
                            + UUID.randomUUID().toString();
                    String queueName = UUID.randomUUID().toString();

                    message.setSender(Thread.currentThread().getName());
                    try {
                        sender.sendMessage(routingkey, queueName,
                                "this is message");
                    } catch (InvalidMessagingParameters e) {
                        e.printStackTrace();
                    }

                }
            }).start();
            Thread.sleep(1000);

        }
        Thread.currentThread();
        Thread.sleep(10000);
        return true;
    }
}

Message Sender

this is my main message sender class

    @Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MessageList message;
    String queueName = "";
    String routingKey = "";
    @Autowired
    private QueueCreationService service;
    private boolean messageSentFlag;
    String returnedMessage = "";
    private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());

    public boolean sendMessage(String routingKey, String queueName,
            String messageToBeSent) throws InvalidMessagingParameters {
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);

        else {
            this.routingKey = routingKey;
            this.queueName = queueName;
        }
        service.processBinding(queueName, routingKey);
        message.addMessages(messageToBeSent);
        return execute();
    }

    /*
     * overloaded sendMessage method will use requestMap . RequestMap includes
     * queueName and routingKey that controller provides.
     */
    public boolean sendMessage(Map<String, String> requestMap)
            throws MessagingConnectionFailsException,
            InvalidMessagingParameters {
        this.queueName = requestMap.get("queue");
        this.routingKey = requestMap.get("routingkey");
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);
        service.processBinding(queueName, routingKey);
        preparingMessagingTemplate();
        return execute();
    }

    private boolean execute() {
        for (int i = 0; i < 5 && !messageSentFlag; i++) {
            executeMessageSending();
        }
        return messageSentFlag;
    }

    private String convertMessageToJson(MessageList message) {
        ObjectWriter ow = new ObjectMapper().writer()
                .withDefaultPrettyPrinter();
        String json = "";
        try {
            json = ow.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return json;
    }

    private void executeMessageSending() {
        rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
                convertMessageToJson(message), new CorrelationData(UUID
                        .randomUUID().toString()));

    }

    private void preparingMessagingTemplate() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode,
                    String replyText, String exchange, String routingKey) {
                returnedMessage = replyText;
            }
        });
        rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack,
                    String cause) {
                System.out.println("*" + ack);

                if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
                    messageSentFlag = ack;
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+  has been successfully delivered");
                } else {
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+ has not been delivered");

                }
            }
        });
    }
}

My configuration class which is used by messaging

    @Configuration
    @ComponentScan("com.alpharaid.orange.*")
    @PropertySource("classpath:application.properties")

public class MessageConfiguration {

    String content = "";
    @Value("${rabbitmq_host}")
    String host = "";
    String port = "";
    @Value("${rabbitmq_username}")
    String userName = "";
    @Value("${rabbitmq_password}")
    String password = "";
    String queueName = "";
    InputStream input = null;

    @Autowired
    public MessageConfiguration() {
    }

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    @Scope("prototype")
    public QueueCreationService service() {
        return new QueueCreationService();
    }

    @Bean
    @Scope("prototype")
    public RabbitAdmin admin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                this.host);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

}

My Problems :

  1. As i can see on server some threads are successfully delivering messages and others not.

  2. there is completely no certainity of rabbitTemplate listener (

    rabbitTemplate.setReturnCallback(new ReturnCallback() {

i need listener to work everytime because on that basis i will try to send message again

    private boolean execute() {
    for (int i = 0; i < 5 && !messageSentFlag; i++) {
        executeMessageSending();
    }
    return messageSentFlag;
}

i can see sometimes messages are being delivered 5 times because messageSentFlag is false and which becomes true only in Confirm listener.

  1. Please tell me how to delete queues?because i have 8000 of them ,i saw one method in rabbitAdmin for deleting queue but it need queue's name and my queues are just any random queue(UUID)

please provide me your thoughts , how can i improve it or is there any work around? For my application multithreading environment is must.

Thanks in advance.


Solution

  • RabbitMQ only guarantees message order once the message is in a particular queue.

    There are no guarantees of message order for sending messages to RabbitMQ, unless you put those guarantees in place. This is a difficult, if not impossible thing to do in many circumstances - especially in a multi-threaded environment like yours.

    If you need to guarantee messages are processed in a certain order, you need to look at building or using a resequencer

    The general idea is that you need to number your messages at the source - 1, 2, 3, 4, 5, etc. When your consumers pull messages out of the queue, you will look at the message number and see if this is the one that you need right now. If it is not, you will hang on to the message and process it later. Once you have the message # that you are currently looking for, you will process all of the messages that you currently have held in sequence.

    spring should have something like a resequencer available, though i'm not familiar enough with that ecosystem to point you in the right direction.