Search code examples
c#reflectiondependency-injectionmasstransitconstructor-injection

How to invert/inject dependency - MassTransit Consumer


I'm working on a project and everything is working, but I have one tightly coupled dependency that I don't understand how to invert/inject.

The problem is in my Consumer class, which will receive a command message to start a process, which is part of a global message queue services project, e.g. MyCompany.MQ.Services, yet has a tightly coupled dependency on the process the command message tells it to start, e.g.:

public Task Consume(ConsumeContext<MyMessageInterface> context)
{
    logger = new LoggerConfiguration()
        .WriteTo.Console()
        .CreateLogger();

    try
    {
        TightCoupleProcess tcp = new TightCoupleProcess(context);

        logger.Information("{Blah}, {Blah}, {Blah}", context.Message.exampleVar1, context.Message.exampleVar2, context.Message.exampleVar3);

        tcp.StartProcess();

        return Task.CompletedTask;

    }
    catch (Exception ex)
    {
        return Task.FromException(ex);
    }
}

Task Consume is a part of MassTransit and I cannot modify the signature of Consume as that is actually the implementation of the IConsumer interface of MassTransit.

I guess what I want is a way to invert/inject that dependency so that my global MQ.services project doesn't depend on the project calling it. I think I have some misapprehension about inversion/injection, but I'm not sure how to articulate my shortcomings. Perhaps what I want isn't possible. I know I can't modify the interface implementation, but I'd be cool if something like the following worked, but since Consume is an implementation of a MassTransit interface, I don't think I can inject an anonymous function from my calling class:

public Task Consume(ConsumeContext<MyMessageInterface> context, AnonFunc() func)
{ 
    try
    {
        func(context)

        logger.Information("{Blah}, {Blah}, {Blah}", context.Message.exampleVar1, context.Message.exampleVar2, context.Message.exampleVar3);

        return Task.CompletedTask;

    }
    catch (Exception ex)
    {
        return Task.FromException(ex);
    }
}

I've managed to get around other dependencies such as message type definitions by using reflection and putting this logic in the MQ.Services project, which allows me to keep all of TightCoupleProcess process related code outside of the MQ.serives project, e.g.:

    public void PublishMessage(object msg)
    {
        MethodInfo method = this.GetType().GetMethod("InvokePublish");
        MethodInfo generic = method.MakeGenericMethod(msg.GetType());
        generic.Invoke(this, new object[] { msg });
    }

    public void InvokePublish<T>(object msg)
    {
        Task.Run(async () =>
        {
            await busControl.Publish(msg);
        }).Wait();


    }

But I can't apply a similar strategy for the Consumer because of the constraints I've already mentioned as well as, I'm sure, a healthy dose of ignorance.

If this is possible will someone please point me in the right direction?

More Info:

Project: App.SubscriberConsole -> references App.Services.Subscriber

Project: App.Services.Subscriber -> references MyCompany.MQ.Services.Consumer, et al

Project MyCompany.MQ.Services.Consumer -> references MassTransit -> implements MassTransit.IConsumer


Solution

  • I am not sure why you think about injecting into the Consume method. The method signature comes from the interface and you cannot change it.

    You should be injecting in the consumer class constructor. Thinking about injecting a factory delegate is the right one.

    public class MyMessageConsumer : IConsumer<MyMessage>
    {
        private readonly Func<IConsumeContext<MyMessage>, TightCoupleProcess> factory;
    
        public MyMessageConsumer(Func<IConsumeContext<MyMessage>, TightCoupleProcess> factory)
        {
            _factory = factory;
        }
    
        public Task Consume(ConsumeContext<MyMessage> context)
        {
            var tcp = _factory(context);
            tcp.StartProcess();
    
            return Task.CompletedTask;
        }
    }
    

    Then you configure it like this:

    Func<IConsumeContext<MyMessage>, TightCoupleProcess> factory = c => new TightCoupleProcess(c);
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
    
        cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
        {
            e.Consumer<MyMessageConsumer>(() => new MyMessageConsumer(factory));
        });
    });
    

    You can find more overloads for consumer configuration method on the endpoint in the documentation.

    One additional thing. You have serious issue with Serilog. You create logger configuration for each message, which you consume. This is not right. You should create logger configuration once in your application entry point.

    Then, you either inject your logger, or use the global Log object, or use MassTransit.SerilogIntegration package and use MassTransit logging in your consumers.