I have multiple services running which all have a reference to the same message contracts library.
Services publish events on MassTransit. I am experiencing, almost constantly, duplicate messages at consumer level.
This is the shared method that all services use to register MassTransit
services.AddMassTransit(busRegistrationConfigurator => {
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
if (consumers != null) {
busRegistrationConfigurator.AddConsumers(consumers());
}
busRegistrationConfigurator.UsingRabbitMq((context, configuration) => {
configuration.Host(rabbitMqMessagingConfiguration.Host, rabbitMqMessagingConfiguration.VirtualHost, hostConfigurator => {
hostConfigurator.Username(rabbitMqMessagingConfiguration.Username);
hostConfigurator.Password(rabbitMqMessagingConfiguration.Password);
hostConfigurator.Heartbeat(30);
});
configuration.Publish<IIntegrationEvent>(x => {
x.Exclude = true;
});
configuration.Publish<BaseIntegrationEvent>(x => {
x.Exclude = true;
});
configuration.ConfigureEndpoints(context);
});
});
Some notes:
consumers
is a parameter of type Func<Type[]>?
configuration.Publish
for IIntegrationEvent
and BaseIntegrationEvent
are needed because all the events share some common initialisation code through the base classId
property of type Guid
which identifies the eventIn another post I have read that it is possible through a configuration. However I am not using a transactional outbox within MassTransit so I am not sure if that post applies to my case.
Do you have any guidance on how to implement duplicate detection either at configuration level (MassTransit/RabbitMQ) or at application level (my code)?
By looking at the documentation of MassTransit I haven't been able to find any reference on how to avoid duplicate detection of messages. Also, I did not wanted to reduce the potential throughput of the messaging system by giving to it the responsibility of identifying duplicates. For these reasons, I ended up shifting the responsibility of duplicate detection to the application, and I used two different approaches to achieve idempotency.
Explicit message de-duplication
To achieve explicit de-duplication I implemented tacking and identification of duplicates inside a base Consumer
class introducing an interface and a base class for consumers.
Here is the interface
public interface IIntegrationEventHandler<in TIntegrationEvent>
where TIntegrationEvent : IIntegrationEvent {
public Task Handle(TIntegrationEvent notification, CancellationToken cancellationToken = default);
}
and here is the base class
public abstract class MassTransitIntegrationEventHandler<TNotification> :
IIntegrationEventHandler<TNotification>,
IConsumer<TNotification>
where TNotification : class, IIntegrationEvent {
private readonly IDuplicateDetectorHandler _duplicateDetector;
protected MassTransitIntegrationEventHandler(IDuplicateDetectorHandler duplicateDetector) {
_duplicateDetector = duplicateDetector;
}
public async Task Consume(ConsumeContext<TNotification> context) {
var notification = context.Message;
if (context.CorrelationId is not null) {
bool hasBeenConsumed = await _duplicateDetector
.IsConsumedAsync(context.CorrelationId.Value, context.CancellationToken);
if (!hasBeenConsumed) {
await Handle(notification, context.CancellationToken);
await _duplicateDetector.MarkConsumed(context.CorrelationId.Value, context.CancellationToken);
}
else {
//message has already been consumed
}
}
await Task.CompletedTask;
}
public abstract Task Handle(TNotification notification, CancellationToken cancellationToken = default);
}
Every consumer can now inherit from the base class and simply implement the Handle
method. The actual duplicate detector can be implemented as per your own application requirements as long as it implements the following interface
public interface IDuplicateDetectorHandler {
Task<bool> IsConsumedAsync(Guid messageId, CancellationToken cancellationToken = default);
Task MarkConsumed(Guid messageId, CancellationToken cancellationToken = default);
}
I implemented mine by using a cache.
Changed Message semantics to support idempotency
As described in this post, I have also changed message semantics in a way that resending the message does not affect the system. Wherever possible I modified message content to represent the desired final state rather than an incremental change. For instance, instead of sending a message like Add $100 to account 123 I redefined it to be Set the balance of account 123 to $110.
I am still running my tests because of this change but everything is working fine, up to now. Credits for the solution goes to this great post, Guaranteed Delivery and Idempotent Receivers in Messaging System found on Medium.