I have a large project following the clean architecture solution layout and some of the domain driven design / CQRS approaches. I have followed Milan Jovanovic's early clean architecture Youtube videos quite religiously to arrive here if that helps. Prior to integrating Mass Transit, I was using his outbox pattern approach which uses an EF core interceptor to intercept the SavingChangesAsync method, extract the events raised by the domain during that transaction and save them to an outbox table like so :
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return base.SavingChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
var messages = events
.Where(e => e is not null)
.Select(e => new OutboxMessage()
{
Id = Guid.NewGuid(),
OccurredOnUtc = DateTime.UtcNow,
Type = e.GetType().Name,
Content = JsonConvert.SerializeObject(e, new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All
})
})
.ToList();
context.Set<OutboxMessage>().AddRange(messages);
return base.SavingChangesAsync(eventData, result, cancellationToken);
}
These entries are then later picked up by a Quartz job which runs every X seconds and publishes the events using MediatR's notifications. I will not go into greater details on this current approach simply because it works absolutely flawlessly.
I decided to replace this great approach with Mass Transit / RabbitMQ to make it a true pub/sub and get rid of the 10 second delay amongst other things. Here is how I register Mass Transit :
private static void ConfigureMessageQueue(IServiceCollection services, IConfiguration configuration)
{
services.Configure<MessageBrokerSettings>(configuration.GetSection("MessageBroker"));
services.AddSingleton(sp =>
sp.GetRequiredService<IOptions<MessageBrokerSettings>>().Value);
services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumersFromNamespaceContaining<ISomeApplicationMarker>();
busConfigurator.UsingRabbitMq((context, configurator) =>
{
var settings = context.GetRequiredService<MessageBrokerSettings>();
configurator.Host(new Uri(settings.Host), h =>
{
h.Username(settings.Username);
h.Password(settings.Password);
});
configurator.ConfigureEndpoints(context);
});
});
services.AddTransient<IEventBus, EventBus>();
}
The event bus is simply an abstraction of the IBus interface to decouple my application code from Mass Transit / RabbitMQ as much as possible :
internal sealed class EventBus : IEventBus
{
private readonly IBus _bus;
private readonly ILogger<EventBus> _logger;
public EventBus(IBus bus, ILogger<EventBus> logger)
{
_bus = bus;
_logger = logger;
}
public async Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : class, IDomainEvent
{
_logger.LogDebug("Publishing event {@Event}", message);
await _bus.Publish(message, cancellationToken);
}
}
I will not show my consumers for brevity but they are just IConsumer implementations of the right event type.
I have made a new Ef Core interceptor that does almost the same thing except it uses the event bus to publish the event to RabbitMQ instead of saving it in the outbox table.
Here's where this gets weird.
If I publish an event directly from a MediatR handler (therefore skipping the new interceptor completely), everything works. The message goes to RabbitMQ and hits my consumer back and does what I want.
If I try to use my interceptor, the execution does go into it at the right time, I see the event bus' logging statement that the event has been published, I see the message hitting RabbitMQ but it NEVER triggers the consumer! As far as I can tell everything about the broker is configured correctly since it works when doing a direct call to Publish from a MediatR handler. I see Mass Transit's logging statements that the endpoints have been configured and wired to the right consumer when the application starts up, RabbitMQ shows the exchanges and queues correctly, everything is the same (obviously since nothing at all changes compared to it working when called from a MediatR handler).
Sorry this is a lot to process but this one truly makes no sense to me. I am happy to provide more code if it can help. Thanks
EDIT
Here is my interceptor as requested. I have tried overriding SavingChangesAsync and SavedChangesAsync with no success :
public sealed class ConvertDomainEventsToMessageBrokerMessagesInterceptor : SaveChangesInterceptor
{
private readonly IEventBus _eventBus;
public ConvertDomainEventsToMessageBrokerMessagesInterceptor(IEventBus eventBus) => _eventBus = eventBus;
/*public async override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return await base.SavingChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
var res = await base.SavingChangesAsync(eventData, result, cancellationToken);
foreach (var domainEvent in events)
await _eventBus.PublishAsync(domainEvent, cancellationToken);
return res;
}*/
public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return await base.SavedChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
foreach(var domainEvent in events)
await _eventBus.PublishAsync(domainEvent, cancellationToken);
return await base.SavedChangesAsync(eventData, result, cancellationToken);
}
}
I figured out my issue, there is another difference between the direct publishing from the MediatR handlers and the publishing from an EF Core interceptor : one publishes concrete types and the other publishes abstract types!
All my events inherit from IDomainEvent and the list in my AggregateRoot class is a list of IDomainEvents. When publishing directly from a handler, I can explicitly publish a concrete SomeActionHappenedEvent type, which Mass Transit/RabbitMQ recognizes.
My interceptor accessing the list of IDomainEvent types ends up publishing as an IDomainEvent, not the concrete type. I tried publishing an explicit SomeActionHappenedEvent from the interceptor and it works just fine.
My problem ultimately was that I was using Publish<TMessage>(TMessage message)
instead of Publish(object message)
which was publishing things as the IDomainEvent type which Mass Transit/RabbitMQ know nothing about. Once I switched my method signature everything started to work.