Search code examples
azureservicebusmasstransit

Mass Transit stops consuming after exception occurs


I'm using MassTransit with Azure Service Bus. Everything seems to work fine, except when my consumer throws an exception.

When it occurs, the message is shown as a dead letter in my subscription overview through Azure portal and then, the consumer stops consuming new incoming messages.

My configuration looks like this:

_azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    cfg.Host(new HostSettings
    {
        ServiceUri = new Uri(pharmacyEndpointSettings.Endpoint.BdcpServiceBus),
        TransportType = ServiceBusTransportType.AmqpTcp,
        TokenCredential = new ClientSecretCredential(
            pharmacyEndpointSettings.Endpoint.BdcpServiceTenantId,
            pharmacyEndpointSettings.Endpoint.BdcpServiceClientId,
            pharmacyEndpointSettings.Endpoint.BdcpServiceClientSecret)
    });
    
    cfg.UseMessageRetry(c => c.Immediate(5));
    cfg.AutoDeleteOnIdle = TimeSpan.FromDays(AutoDeleteSubscriptionOnIdleInDays);
    cfg.DefaultMessageTimeToLive = TimeSpan.FromDays(MessageTimeToLiveInDays);
    cfg.EnableDeadLetteringOnMessageExpiration = true;

    cfg.Message<MyMessage>(m => m.SetEntityName("MyTopicName"));
    cfg.SubscriptionEndpoint<T>(_subscriptionName.Value, se =>
    {
        se.Rule = new CreateRuleOptions("Receiver", new SqlRuleFilter("receiver='all' OR receiver='MySpecificReceiver'"));
        
        se.Consumer<MyMessage>();
        se.ConfigureDeadLetterQueueDeadLetterTransport();
        se.ConfigureDeadLetterQueueErrorTransport();
    });
}

_azureServiceBus.Start();

My consumers look like this

public class MyMessageConsumer : IConsumer<T>
{
   public override Task Consume(ConsumeContext<MyMessage> context)
   {

        if (context.Message.Id == "INVALID")
        {
            return Task.FromException(new Exception("invalid id"));
        }

        return Task.CompletedTask;
   }
}

In the deadletter properties I find an OperationCancelledException

à GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass5_01.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() à MassTransit.Azure.ServiceBus.Core.Pipeline.SendEndpointContextFactory.d__7.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() à GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource-Send>d__7.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() à GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() à GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource-Send>d__7.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Transports.HostConfigurationRetryExtensions.d__0.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Context.BaseConsumeContext.d__611.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Context.BaseConsumeContext.<NotifyFaulted>d__561.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Pipeline.Filters.ConsumerMessageFilter2.<GreenPipes-IFilter<MassTransit-ConsumeContext<TMessage>>-Send>d__4.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à GreenPipes.Filters.TeeFilter1.<>c__DisplayClass5_0.<g__SendAsync|1>d.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à GreenPipes.Filters.OutputPipeFilter2.<SendToOutput>d__9.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() à GreenPipes.Filters.OutputPipeFilter2.d__9.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Pipeline.Filters.DeserializeFilter.d__4.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à GreenPipes.Filters.RescueFilter2.<GreenPipes-IFilter<TContext>-Send>d__5.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() à GreenPipes.Filters.RescueFilter2.<GreenPipes-IFilter-Send>d__5.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Pipeline.Filters.DeadLetterFilter.<GreenPipes-IFilter-Send>d__3.MoveNext() --- Fin de la trace de la pile à partir de l'emplacement précédent au niveau duquel l'exception a été levée --- à System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) à System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) à System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() à MassTransit.Transports.ReceivePipeDispatcher.d__17.MoveNext()

Edit:

Also I realized that the retry count is being increased without any hit inside the Consume method of the consumer.

It also appears that there are Azure.RequestFailedException, but I can't say when it happens since the consuming seem stopped.

Service request failed. Status: 401 (Unauthorized) Content: 401Authorization failed for specified action: Manage,EntityWrite. TrackingId:1d71f7fe-1627-4fe4-86fb-67446207d0a2_G11, SystemTracker:sb-staging-spoke.servicebus.windows.net:MassTransit_ReceiveFault, Timestamp:2022-12-09T19:38:52 Headers: Transfer-Encoding: chunked Strict-Transport-Security: REDACTED Content-Type: application/xml; charset=utf-8 Date: Fri, 09 Dec 2022 19:38:52 GMT Server: Microsoft-HTTPAPI/2.0


Solution

  • It's trying to publish a ReceiveFault, and failing because it can't create the ReceiveFault topic. If you were to look in the logs, you might see why it can't deserialize the message. Ensuring the topic exists would help avoid the fault publish error, which might avoid whatever causes the issue.

    However, version 7.3.1 is no longer supported. So I'd suggest trying to resolve whatever causes the issue. Making sure you have good logging enabled should help identify any other related issues.

    Update

    You can disable fault publishing by setting PublishFaults = false.