I need to implement two API to suspend and to reactivate Rebus dequeuing process.
The situation is that the service bus Rebus and another software share the same database and tables to store application data (not the queue), but the latter software (let me call it "Xprocess"), at some specific moment of the day, locks some table for around 3 minutes. Since Xprocess has priority over Rebus messages execution, we agreed to pause Rebus during this interval by a Suspend and Resume api called by the Xprocess.
I'm wandering which is the best way to suspend Rebus messages execution. Take into account that it is not possible to add a specific message that suspends the following messages because when Xprocess starts there could be already some messages in the queue that must be suspended;
My idea is to add a step in the receive pipeline that interrogates a "Pausing" service before calling the next step. Like this:
public class HandlePausingServiceStep : IIncomingStep
{
readonly ILog _log;
readonly IPausingService _pausingService;
public HandlePausingServiceStep(IRebusLoggerFactory rebusLoggerFactory, IPausingService pausingService)
{
_log = rebusLoggerFactory.GetLogger<HandleApplicationExceptionsStep>();
_pausingService = pausingService;
}
public async Task Process(IncomingStepContext context, Func<Task> next)
{
while (_pausingService.Pause)
Thread.Sleep(5000);
await next();
}
}
I would consider the following two options before doing anything else:
1) Simply stop the process that hosts your Rebus endpoint during the time when the database is locked.
If your endpoint is hosted as a Windows Service (which in many cases it probably should), you can simply make a script net stop YourService
/net start YourService
and use Windows' Task Scheduler to do it.
2) Use Rebus' thread worker API to set the number of workers to 0.
You can do something like this:
bus.Advanced.Workers.SerNumberOfWorkers(0);
and then when it is time to do stuff again:
bus.Advanced.Workers.SerNumberOfWorkers(5);
but obviously you should not do this from a Rebus handler (because how will it ever get the message that wakes it up again? :D )
Personally, I would prefer the first option because it is so simple and will be easy for operations personnel to manage.
Update: After realizing that this bus is hosted in multiple instances of a web application, I think I would solve this challenge in a slightly different way...
I think an acceptable solution would be to have a timer in the background of the web application periodically check a configuration flag value in a special table in the database - this could be done every 5-10 s or something.
And then when the flag is signaled, all instances would SetNumberOfWorkers(0)
, thus effectively stopping all message processing.
When the flag was raised again, the timer could add the workers back again.
If it is critical that no message processing occurs while the database is locked, it is probably important to add a bit of a time margin around raising/lowering the flag.