I have the following problem with mass transit on a .Net app.
I have a class that implements the IConsumer<> interface and consumes messages from a RabbitMQ queue. The app is a running as pod on a Kubernetes cluster and it can scale horizontally so we can have multiple instances of the app running at the same time if needed.
I am also injecting the service with:
builder.Services.AddMassTransit()
where I am configuring the Consumer, RabbitMQ Host and the receive endpoint.
When a pod shuts down / restarts for whatever reason and because there is an actually delay from the moment that the pod receives the SIGTERM signal and the moment that it completely shutsdown, I want to be able to:
Here is what I have right now, which doesn't work mainly because the cancellation token from the context is not changing states:
public async Task Consume(ConsumeContext<NotificationMessage> context)
{
try
{
if (context.CancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException();
}
// call method that also receives the Cancellation Token
}
catch (Exception ex)
when (ex is OperationCanceledException || ex is TaskCanceledException)
{
// log it
throw;
}
}
Please let me know if you have any ideea how I can achive this sort of behavior. Thanks!
You can configure the host options in MassTransit so that when the generic host terminates, the bus and its consumers can be canceled as well.
Setting ConsumerStopTimeout
to a value lower than StopTimeout
will signal the consumers that the bus is stopping via the CancellationToken
property on ConsumeContext
.
Also, be sure to set the .NET host stop timeout as well, which is also documented at the above link.