Search code examples
.netrabbitmqmasstransitifilter

MassTransit SendFilter cannot stop message sending


I'm using MassTransit with RabbitMQ in a multi-tenant application. I implemented a customized outbox pattern using a IFilter that should just save the message in my DB but I discovered that it also send the message to RabbitMQ.
My understanding is that you should explicitly invoke await next.Send(context); to continue the pipe, but it sends the message in any case.

Here a simplified app that shows the behavior: message is sent to the queue but I expect it is not. How can I avoid that message is sent?

using MassTransit;
namespace Test;
public class Program
{
    public static async Task Main(string[] args)
    {
        var builder = WebApplication.CreateBuilder(args);
        IServiceCollection services = builder.Services;

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) =>
            {
                // To intercept sending
                cfg.UseSendFilter(typeof(OutboxPreSendFilter<>), context);

                cfg.Host("localhost", 5672, "/", h =>
                {
                    h.Username("test");
                    h.Password("test");
                });
            });
        });

        var app = builder.Build();

        using var scope = app.Services.CreateScope();
        var sendEndpointProvider = scope.ServiceProvider.GetRequiredService<ISendEndpointProvider>();
        // Send message to RabbitMQ
        await SendAndReceiveFromQueue(sendEndpointProvider);

        app.Run();
    }

    public static async Task SendAndReceiveFromQueue(ISendEndpointProvider sendEndpointProvider)
    {
        var messageToSent = Guid.NewGuid().ToString();
        var sendEndpoint = await sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{nameof(TestMsg)}"));
        // Send message to RabbitMQ
        await sendEndpoint.Send(new TestMsg() { Id = 1, Name = messageToSent, Description = $"{DateTime.Now:f}" });
    }
}

public class OutboxPreSendFilter<T> : IFilter<SendContext<T>> where T : class
{
    public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
    {
        // I do stuff in here in my app
 
        // I don't want to send anything, so next line is commented !!!
        // await next.Send(context);
    }

    public void Probe(ProbeContext context)
    {
        context.CreateFilterScope("outboxPreSendFilter");
    }
}

public class TestMsg
{
    public int Id { get; set; }
    public string? Name { get; set; }
    public string? Description { get; set; }
}

Solution

  • The send pipeline is not part of the control flow path for send, same with publish.