Search code examples
.net-corefilterautofacmasstransit

MassTransit Filtering messages a consumer can handle


I have a generic message interface like this:

public interface IMyMessage
{
     int EventCode {get;}
}

Now I have multiple consumers handling this message:

public class MyConsumer1: IConsumer<IMyMessage>{...}
public class MyConsumer2: IConsumer<IMyMessage>{...}

I want MyConsumer1 to handle only those messages where EventCode==1, and have MyConsumer2 handle all messages where EventCode==2.

I know that I can do an if statement in the Consume method, but want to know if there is a better way like some routing filter?

My preferred way would be to create an Attribute ie. HandlesEventCodeAttribute(1) and apply it on the Consumers.

I also use Autofac container integration with the MassTransit.

Please help.

Thanks


Solution

  • Before I give any input on the actual question, I would ask why you'd use the same message type with a property to determine which consumers actually consume the message. There are better (more efficient) methods available, such as using DIRECT exchanges with RabbitMQ.

    You could create your own attribute, and create a middleware filter that would look at the consumer, see if it has the custom attribute, and then use the value from that attribute to check the message and filter it if the consumer is not interested in it.

    A full working sample is shown below:

    First, create the attribute.

    class EventCodeAttribute :
        Attribute
    {
        public int EventCode { get; }
    
        public EventCodeAttribute(int eventCode)
        {
            EventCode = eventCode;
        }
    }
    

    And the message type:

    interface IEventMessage
    {
        int EventCode { get; }
    }
    

    The middleware filter:

    class EventCodeFilter<TConsumer> :
        IFilter<ConsumerConsumeContext<TConsumer, IEventMessage>>
        where TConsumer : class
    {
        readonly int _eventCode;
    
        public EventCodeFilter()
        {
            var attribute = typeof(TConsumer).GetCustomAttribute<EventCodeAttribute>();
            if (attribute == null)
                throw new ArgumentException("Message does not have the attribute required");
    
            _eventCode = attribute.EventCode;
        }
    
        public async Task Send(ConsumerConsumeContext<TConsumer, IEventMessage> context, IPipe<ConsumerConsumeContext<TConsumer, IEventMessage>> next)
        {
            if (context.Message.EventCode.Equals(_eventCode))
            {
                await next.Send(context);
            }
        }
    
        public void Probe(ProbeContext context)
        {
            var scope = context.CreateFilterScope("eventCode");
            scope.Add("code", _eventCode);
        }
    }
    

    The sample consumer:

    [EventCode(27)]
    class EventCodeConsumer :
        IConsumer<IEventMessage>
    {
        public async Task Consume(ConsumeContext<IEventMessage> context)
        {
        }
    }
    

    Finally, configure the consumer to use the filter:

    builder.AddMassTransit(cfg =>
    {
        cfg.AddConsumer<EventCodeConsumer>(x =>
            x.ConsumerMessage<IEventMessage>(p => p.UseFilter(new EventCodeFilter<EventCodeConsumer>())));
    });