Search code examples
distributedmasstransitsagarouting-slip

Why Compensate method doesn't Call when a consumer thrown exception in MassTransit RouterSlip


I've built a router slip inside a saga state machine :

var builder = new RoutingSlipBuilder(NewId.NextGuid());
            var submitOrderUrl = QueueNames.GetActivityUri(nameof(SubmitOrderActivity));
            builder.AddActivity("SubmitOrder", submitOrderUrl, new
            {
                context.Message.OrderId
            });;
            builder.AddActivity("Payment", QueueNames.GetActivityUri(nameof(PaymentActivity)), new {
                context.Message.OrderId,
                context.Message.CustomerId,
                context.Message.Credit
            });
           
            builder.AddActivity("TakeProduct", QueueNames.GetActivityUri(nameof(TakeProductActivity)), new
            {
                context.Message.OrderId,
                Baskets
            });
            builder.AddVariable("OrderId", context.Message.OrderId);
            var routingSlip = builder.Build();
            await context.Execute(routingSlip);

And I have TakeProductActivity activity : public class TakeProductActivity : IActivity<TakeProductArgument, TakeProductLog>: ...

 public async Task<ExecutionResult> Execute(ExecuteContext<TakeProductArgument> context)
        {
            logger.LogInformation($"Take Product Courier called for order {context.Arguments.OrderId}");            
            var uri = QueueNames.GetMessageUri(nameof(TakeProductTransactionMessage));
            var sendEndpoint = await context.GetSendEndpoint(uri);
            await sendEndpoint.Send<TakeProductTransactionMessage>(new
            {
                ProductBaskets = context.Arguments.Baskets                
            });
             
            return context.Completed(new { Baskets = context.Arguments.Baskets, OrderId=context.Arguments.OrderId });
        }

When I use sendEndpoint.Send() method (fire & forget), when an exception occurred in the service, compensate method doesn't activate automatically, But when I use requestClient.GetResponse (request/reply) method to call service, when an exception occurred automatically Compensate method is called. and in PaymentConsumer when an exception is thrown it must be compensated methods for payment called but it doesn't!

///this class has implemented in another micro-service hosted separate process:
public class TakeProductTransactionConsumer : IConsumer<TakeProductTransactionMessage>
....
 public async Task Consume(ConsumeContext<TakeProductTransactionMessage> context)
        {
            if(context.Message.ProductBaskets.Count>0)
             { 
                    throw new Exception("Process Failed!");
             }
            logger.LogInformation($"Take product called ");
         
            Dictionary<int, int> productCounts = new Dictionary<int, int>();
            foreach (var item in context.Message.ProductBaskets)
            {
                productCounts.Add(item.ProductId, item.Count);
            }
            var products = await productService.TakeProducts(productCounts);
            await publishEndpoint.Publish<ProductsUpdatedEvent>(new
            {
                ProductUpdatedEvents = products.Select(p =>new { ProductId = p.Id,p.Price,p.Count}).ToList()
            });
           
            
        }

the problem is that MassTransit couldn't fetch Exception from rabbitMQ and automatically call compensate methods. How should I say to MassTransit to call compensate when the exception is thrown in router slip activities


Solution

  • If your Take Product activity uses Send to fire-and-forget to the take product service, and that service throws an exception, the activity will never know about it because it's already completed. Fire-and-forget is just that, no exceptions within the destination service are observed.

    If you want the take product activity to fail when the take product service throws an exception, you need to use request/response to observe the exception from the service.