Search code examples
masstransitmessage-type

Publish generic (interface based) message and Consume concrete (concrete class) message in MassTransit


I have this design choice problem and somehow slogging but in vain. It works only in a specific scenario.

I am trying to publish and consume a message in MassTransit. Ex: (Publisher - a simple console app)

IShape message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
bus.Publish(message);

(Consumers - CircleConsumer and SquareConsumer)

 class CircleConsumer : IConsumer<IShape>
    {
        public Task Consume(ConsumeContext<IShape> context)
        {
            var circle = context.Message as Circle;
            return Task.CompletedTask;
        }
    }

    class SquareConsumer : IConsumer<IShape>
    {
        public Task Consume(ConsumeContext<IShape> context)
        {
            var square = context.Message as Square;
            return Task.CompletedTask;
        }
    }

(Consumers configuration in .Net Core Hosted Service Project)

 public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>()
                   .AddScoped<SquareConsumer>()
                    .AddScoped<CircleConsumer>()
                    .AddMassTransit(cfg =>
                     {
                         cfg.AddBus(ConfigureBus);
                         cfg.AddConsumer<SquareConsumer>();
                         cfg.AddConsumer<CircleConsumer>();
                     })
                    .AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
                    .AddSingleton<IHostedService, TestMTConsumerHostedService>();

                    IBusControl ConfigureBus(IServiceProvider provider)
                    {
                        return Bus.Factory.CreateUsingRabbitMq(cfg =>
                        {
                            var host = cfg.Host(hostContext.Configuration["RabbmitMQ:Server:Host"], hostContext.Configuration["RabbmitMQ:Server:VirtualHost"], h =>
                            {
                                h.Username(hostContext.Configuration["RabbmitMQ:Auth:Username"]);
                                h.Password(hostContext.Configuration["RabbmitMQ:Auth:Password"]);
                            });

                            cfg.ReceiveEndpoint("CircleQueue", ep =>
                            {
                                ep.PrefetchCount = 16;
                                ep.UseMessageRetry(r => r.Interval(2, 100));
                                ep.Consumer<CircleConsumer>(provider);
                            });

                            cfg.ReceiveEndpoint("SquareQueue", ep =>
                            {
                                ep.PrefetchCount = 16;
                                ep.UseMessageRetry(r => r.Interval(2, 100));
                                ep.Consumer<SquareConsumer>(provider);
                            });
                        });
                    }

                });

My requirement is to have Publisher publish the message without the knowledge of concrete classes. And the only one of the consumers receive the message based on the message type.

But it looks like both the consumers are receiving the message and also casting doesnt work either. Desired: Suppose, when the publisher send Square object, only Square consumer should receive the call. But, in my case, both SquareConsumer and CircleConsumer receiving the message.

As a workaround, this works:

  1. Always publish concrete objects.

            bus.Publish(new Square());
    
  2. Declare the consumers with concrete types.

           class CircleConsumer : IConsumer<Circle>
            {
                public Task Consume(ConsumeContext<Circle> context)
                {
                    var circle = context.Message;
                    return Task.CompletedTask;
                }
            }
    
            class SquareConsumer : IConsumer<Square>
            {
                public Task Consume(ConsumeContext<Square> context)
                {
                    var square = context.Message;
                    return Task.CompletedTask;
                }
            }
    

But, it would be great if I could do it generically.

Any suggestions?


Solution

  • If you change your code like this:

    object message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
    bus.Publish(message);
    

    and consumers

    class CircleConsumer : IConsumer<Circle>
    {
        public Task Consume(ConsumeContext<Circle> context)
        {
            // do circle stuff
        }
    }
    
    class SquareConsumer : IConsumer<Square>
    {
        public Task Consume(ConsumeContext<Square> context)
        {
            // do square stuff
        }
    }
    

    it will work as expected.

    Here I elaborate on the changes:

    1. Using Publish with an instance of a specific type means using the Publish<T>(T message) overload, which uses T as the message type. When explicitly setting the message type to object, we call the Publish(object message) overload. In that case, MassTransit will look up all types that the message implements.
    2. You don't need to consume the message of a shared interface type if your aim to consume the concrete type messages. You just need to create consumers for those specific types. As long as you use publish as I described in the previous point, the message will go to both IShape and Circle exchanges (for example).