Search code examples
rabbitmqmasstransit

MassTransit and RabbitMQ: implement duplicate message detection for consumers


I have multiple services running which all have a reference to the same message contracts library.

Services publish events on MassTransit. I am experiencing, almost constantly, duplicate messages at consumer level.

This is the shared method that all services use to register MassTransit

services.AddMassTransit(busRegistrationConfigurator => {
    busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
    if (consumers != null) {
        busRegistrationConfigurator.AddConsumers(consumers());
    }
    busRegistrationConfigurator.UsingRabbitMq((context, configuration) => {
        configuration.Host(rabbitMqMessagingConfiguration.Host, rabbitMqMessagingConfiguration.VirtualHost, hostConfigurator => {
            hostConfigurator.Username(rabbitMqMessagingConfiguration.Username);
            hostConfigurator.Password(rabbitMqMessagingConfiguration.Password);
            hostConfigurator.Heartbeat(30);
        });
        configuration.Publish<IIntegrationEvent>(x => {
            x.Exclude = true;
        });
        configuration.Publish<BaseIntegrationEvent>(x => {
            x.Exclude = true;
        });
        configuration.ConfigureEndpoints(context);
    });
});

Some notes:

  1. consumers is a parameter of type Func<Type[]>?
  2. The two configurations configuration.Publish for IIntegrationEvent and BaseIntegrationEvent are needed because all the events share some common initialisation code through the base class
  3. All the contracts have and Id property of type Guid which identifies the event
  4. Using MassTransit latest version with RabbitMQ deployed locally through their customised docker image in an AspNet 9 application.

In another post I have read that it is possible through a configuration. However I am not using a transactional outbox within MassTransit so I am not sure if that post applies to my case.

Do you have any guidance on how to implement duplicate detection either at configuration level (MassTransit/RabbitMQ) or at application level (my code)?


Solution

  • By looking at the documentation of MassTransit I haven't been able to find any reference on how to avoid duplicate detection of messages. Also, I did not wanted to reduce the potential throughput of the messaging system by giving to it the responsibility of identifying duplicates. For these reasons, I ended up shifting the responsibility of duplicate detection to the application, and I used two different approaches to achieve idempotency.

    Explicit message de-duplication

    To achieve explicit de-duplication I implemented tacking and identification of duplicates inside a base Consumer class introducing an interface and a base class for consumers.

    Here is the interface

    public interface IIntegrationEventHandler<in TIntegrationEvent> 
        where TIntegrationEvent : IIntegrationEvent {
    
        public Task Handle(TIntegrationEvent notification, CancellationToken cancellationToken = default);
    }
    

    and here is the base class

    public abstract class MassTransitIntegrationEventHandler<TNotification> :
        IIntegrationEventHandler<TNotification>, 
        IConsumer<TNotification>
        where TNotification : class, IIntegrationEvent {
    
        private readonly IDuplicateDetectorHandler _duplicateDetector;
    
        protected MassTransitIntegrationEventHandler(IDuplicateDetectorHandler duplicateDetector) {
            _duplicateDetector = duplicateDetector;
        }
    
        public async Task Consume(ConsumeContext<TNotification> context) {
            var notification = context.Message;
            if (context.CorrelationId is not null) {
                bool hasBeenConsumed = await _duplicateDetector
                    .IsConsumedAsync(context.CorrelationId.Value, context.CancellationToken);
                if (!hasBeenConsumed) {
                    await Handle(notification, context.CancellationToken);
                    await _duplicateDetector.MarkConsumed(context.CorrelationId.Value, context.CancellationToken);
                }
                else {
                    //message has already been consumed
                }
            }
            await Task.CompletedTask;
        }
    
        public abstract Task Handle(TNotification notification, CancellationToken cancellationToken = default);
    }
    

    Every consumer can now inherit from the base class and simply implement the Handle method. The actual duplicate detector can be implemented as per your own application requirements as long as it implements the following interface

    public interface IDuplicateDetectorHandler {
        Task<bool> IsConsumedAsync(Guid messageId, CancellationToken cancellationToken = default);
        Task MarkConsumed(Guid messageId, CancellationToken cancellationToken = default);
    }
    

    I implemented mine by using a cache.

    Changed Message semantics to support idempotency

    As described in this post, I have also changed message semantics in a way that resending the message does not affect the system. Wherever possible I modified message content to represent the desired final state rather than an incremental change. For instance, instead of sending a message like Add $100 to account 123 I redefined it to be Set the balance of account 123 to $110.

    I am still running my tests because of this change but everything is working fine, up to now. Credits for the solution goes to this great post, Guaranteed Delivery and Idempotent Receivers in Messaging System found on Medium.