Search code examples
rabbitmqrabbitmqctl

How to read RabbitMQ unacknowledged messages / RabbitMQ cycle


I want to read the payload, or messageId of unacknowledged messages in a RabbitMQ queue. Is this possible?

The reason I want to do so is I trying to use RabbitMQ dead letter feature to build a cycle to for auto-generating message periodically. Briefly, create two queues - work queue and delay queue.

  1. Set TTL of the message in delay queue as the time frequency of need to periodically. Can have different messages with different TTL for different job purpose;
  2. put a message into the delay queue. When the message expires, it gets republished into the work queue. The message can sit in the work queue as long as needed until a consumer is up to consume it.
  3. One consumer picks up the message, and process it. If processing succeeds, the consumer needs acknowledge the work queue, and then write the message back to the delay queue; If processing fails (e.g., the thread crashes), no acknowledgement. Then the message would re-appear in the worker queue automatically. Then another consumer can take up the job. When the message sent back to the delay queue gets expired again, it gets republished, then re-consumed by a consumer ...... A cycle constructed, workload distributed.

I want to make sure there is no missing or duplicate messages in the cycle since I do not want missing job or double doing the job at the same time. However, there is tiny tiny chance duplicate messages can happen. Below show the consumer first write back the message to delay queue, and acknowledge the work queue. If the thread crashes right between below two lines, the message would be in the delay queue, and Rabbit republish the message again into work queue. The end up with duplicate messages in the cycle.

  channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

To prevent above, I want to add a dog watch logic after above two line:

  1. Check the total number of messages in the cycle (total messages in both queues) to see whether it is equal my expected number (I expected the number less 10);

  2. If the number does not matches, I want to figure out which one is missing or which one is duplicate, then deal with it. I do not care about the sequence of those messages, or the frequency has been disturbed since this is a really really edge case to consider. I can easily retrieve those messages which are ready and requeue them. But the problem is how to deal with those unacknowledged messages?

Thank you very much in advance!

Roy


Solution

  • It's not possible to read unacknowledged messages from other context the original messages was consumed and held as un-aked.