Search code examples
rabbitmqnservicebusmasstransitmessage-bus

RabbitMQ+MassTransit: how to cancel queued message from processing?


In some exceptional situations I need somehow to tell consumer on receiving point that some messages shouldn’t be processed. Otherwise two systems will become out-of-sync (we deal with some outdates external systems, and if, for example, connection is dropped we have to discard all queued operations in scope of that connection).

Take a risk and resolve problem messages manually? Compensation actions (that could be tough to support in my case)? Anything else?


Solution

  • There are a few ways:

    • You can set a time-to-live when sending a message: await endpoint.Send(myMessage, c => c.TimeToLive = TimeSpan.FromHours(1));, but this will apply to all messages that are sent (or published) like this. I would consider this, after looking at your requirements. This is technical, but it is a proper messaging pattern.

    • Make TTL and generation timestamp properties of your message itself and let the consumer decide if the message is still worth processing. This is more business and, probably, the most correct way.

    • Combine tech and business - keep the timestamp and TTL in message headers so they don't pollute your message contracts, and filter them out using a custom middleware. In this case, you need to be careful to log such drops so you won't be left wonder why messages disappear now and then.

    • Almost any unreliable integration can be monitored using sagas, with timeouts. For example, we use a saga to integrate with Twilio. Since we have no ability to open a webhook for them, we poll after some interval to check the message status. You can start a saga when you get a message and schedule a message to check if the processing is still waiting. As discussed in comments, you can either use the "human intervention required" way to fix the issue or let the saga decide to drop the message.

    • A similar way could be to use a lookup table, where you put the list of messages that aren't relevant for processing. Such a table would be similar to the list of sagas. It seems that this way would also require scheduling. Both here, and for the saga, I'd recommend using a separate receive endpoint (a queue) for the DropIt message, with only one consumer. It would prevent DropIt messages from getting stuck behind the integration messages that are waiting to be processed (and some should be already dropped)

    • Use RMQ management API to remove messages from the queue. This is the worst method, I won't recommend it.