Search code examples
masstransit

Is there a way for a message send as a class to be consumed per Interfaces, this class implements?


Let's say I have a class C, which implements interfaces I1 and I2

namespace Contract
{
    public class C :I1,I2
    {
    }
    public interface I1 { }
    public interface I2 { }
}

So, somewhere in my producer I'll publish C:

...
var message c = new C();
await _busControl.Publish(c);
...

And in my consumer for I1 I'd like to have:

        cfg.Host("localhost", "/");

        cfg.ConfigureMessageTopology();

        cfg.ReceiveEndpoint("subs", endpoint =>
        {
            endpoint.Durable = false;
            endpoint.Handler<I1>(context =>
            {
               ...
            });
          };

And in my consumer for I2 I'd like to have:

         cfg.Host("localhost", "/");

        cfg.ConfigureMessageTopology();

        cfg.ReceiveEndpoint("subs", endpoint =>
        {
            endpoint.Durable = false;
            endpoint.Handler<I2>(context =>
            {
               ...
            });
          };

Is it possible to configure such a routing with existing code? If not, is it a right approach? What would be a suggested approach if I'd like to implement this?


Solution

  • That works by default using RabbitMQ, and also Azure Service Bus.

    It doesn't work with Amazon SQS or ActiveMQ.

    MassTransit refers to it as polymorphic message routing and you can see how the topology is configured in the documentation.

    To clean up your example, this is all you'd need:

    namespace Contract
    {
        public class C :
            I1,I2
        {
        }
        public interface I1 { }
        public interface I2 { }
    }
    

    Then publish:

    var message c = new C();
    await _busControl.Publish(c);
    

    First consumer:

    class OneConsumer :
        IConsumer<I1>
    {
        public async Task Consume(ConsumeContext<I1> context)
        {
        ...
        }
    }
    
    cfg.ReceiveEndpoint("one", endpoint =>
    {
        endpoint.Durable = false;
        endpoint.Consumer<OneConsumer>();
    };
    

    Second consumer:

    class TwoConsumer :
        IConsumer<I2>
    {
        public async Task Consume(ConsumeContext<I2> context)
        {
        ...
        }
    }
    
    cfg.ReceiveEndpoint("two", endpoint =>
    {
        endpoint.Consumer<TwoConsumer>();
    };