I am successfully using MassTransit for a silly sample application where I publish an message (an event) from a Publisher console application and I receive it at two different consumers which are also console applications using RabbitMq.
This is the whole sample project git repo: https://gitlab.com/sunnyatticsoftware/DiegoDrivenDesign/DiDrDe.MessageBus
I want to have a project wrapping MassTransit functionality so that my Publisher and Consumers projects know nothing about MassTransit. The dependencies should go in this direction:
Notice how DiDrDe.MessageBus does not know anything about DiDrDe.Model because it's a generic project that should be valid for any message type.
To achieve this I am implementing the adapter pattern so that my custom interfaces IEventDtoBus
(to publish events) and IEventDtoHandler<TEventDto>
(to consume events) are all my Publisher and Consumers know. The MassTransit wrapper project (called DiDrDe.MessageBus) implements the adapters with an EventDtoBusAdapter
composed of an IEventDtoBus
and a EventDtoHandlerAdapter<TEventDto>
as my only generic IConsumer<TEventDto>
composed of an IEventDtoHandler<TEventDto>
The problem I have is with the way MassTransit requires the consumers to be registered because my consumer is a generic one and its type should not be known by the MassTransit wrapper at compile time.
I need to find a way to register the EventDtoHandlerAdapter<TEventDto>
as a consumer for each TEventDto type I pass at runtime (as a collection of types for example). Please see my repository for all the details.
MassTransit supports an overload method that accepts a type (good! just what I want) but it also requires a second argument Func<type, object> consumerFactory
and I don't know how to implement it.
UPDATE 1: The problem is that I cannot register this generic consumer like:
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();
because I get a compilation error
Severity Code Description Project File Line Suppression State Error CS0310 'EventDtoHandlerAdapter' must be a non-abstract type with a public parameterless constructor in order to use it as parameter 'TConsumer' in the generic type or method 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action<IConsumerConfigurator>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs
UPDATE 2: I have tried several things and I have updated the project on my repo. These are my attempts in the MassTransit wrapper project. Notice how I am able to have everything working if I add a dependency to each message (event) I want to handle. But I don't want that.. I don't want this project to know ANYTHING about the messages it can handle. If only I could register consumers knowing only a message type..
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
// DOES NOT WORK
//var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
//var eventDtoHandler = context.Resolve(typeEventDtoHandler);
//consumer.Consumer(eventDtoHandler);
// DOES NOT WORK
//consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
// DOES NOT WORK
//var consumerGenericType = typeof(IConsumer<>);
//var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
//consumer.Consumer(consumerThingHappenedType, null);
});
UPDATE 3: Following Igor's advice, I try to do the following:
var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
consumer.Consumer(adapterType, (Type x) => context.Resolve(x));
but I get a runtime error saying that
The requested service 'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]' has not been registered. To avoid this exception, either register a component to provide the service, check for service registration using IsRegistered(), or use the ResolveOptional() method to resolve an optional dependency.
I have even tried to register EventDtoHandlerAdapter<>
separately as a IConsumer in case that was the issue but no luck.
builder
.RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
.As(typeof(IConsumer<>))
.SingleInstance();
Also with:
builder
.RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
.AsSelf();
and it tells me that
System.ObjectDisposedException: 'This resolve operation has already ended. When registering components using lambdas, the IComponentContext 'c' parameter to the lambda cannot be stored. Instead, either resolve IComponentContext again from 'c', or resolve a Func<> based factory to create subsequent components from
Just to clarify, The only consumer I need to register is my EventDtoHandlerAdapter<TEventDto>
. It is generic, so essentially it will exist a registration per TEventDto that I support. The thing is that I don't need the types in advance so I need to operate with types.
UPDATE 4: A new attempt as suggested by Igor. This time with the "proxy". I have updated my repo with the last attempt for all details. I have my consumer and marker interface:
public interface IEventDtoHandler
{
}
public interface IEventDtoHandler<in TEventDto>
: IEventDtoHandler
where TEventDto : IEventDto
{
Task HandleAsync(TEventDto eventDto);
}
and I have my own implementation of a consumer that knows nothing about MassTransit:
public class ThingHappenedHandler
: IEventDtoHandler<ThingHappened>
{
public Task HandleAsync(ThingHappened eventDto)
{
Console.WriteLine($"Received {eventDto.Name} " +
$"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
return Task.CompletedTask;
}
}
Now my "wrapper consumer" is what I call the adapter because it knows about MassTransit (it implements MassTransit IConsumer
).
public class EventDtoHandlerAdapter<TConsumer, TEventDto>
: IConsumer<TEventDto>
where TConsumer : IEventDtoHandler<TEventDto>
where TEventDto : class, IEventDto
{
private readonly TConsumer _consumer;
public EventDtoHandlerAdapter(TConsumer consumer)
{
_consumer = consumer;
}
public async Task Consume(ConsumeContext<TEventDto> context)
{
await _consumer.HandleAsync(context.Message);
}
}
Now the last step is to register my "wrapper consumer" with MassTransit. But since it's generic I don't know how to do it. This is the issue.
I can have all my consumer types scanned and registered in Autofac as suggested with:
var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x)
&& !x.IsInterface
&& !x.IsAbstract)
.ToList();
So now I have all the consumer types (all the implementations of IEventDtoHandler
, including my ThingHappenedHandler
). What now? How to register it?
Something like the following does not work:
foreach (var consumerType in consumerTypes)
{
consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}
But I guess it's normal that it doesn't work because what I want to register is my EventDtoHandlerAdapter
, which is the real IConsumer
.
So, I think I didn't understand your suggestion. Sorry!
What I need is something like this:
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));
But without using the ThingHappened model because the model should not be known. Here is where I remain stuck
UPDATE 5: New attempt as suggested by Chris Patterson (his solution was merged into master on my repo), but the problem remains.
To clarify, DiDrDe.MessageBus
must be agnostic of any publisher, consumer and model. It should only depend on MassTransit and DiDrDe.Contracts
, and Chris' solution has a line like:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});
That has a direct dependency on ThingHappened
model. This is not allowed and it's actually no much different from the solution I already had which was:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS works, but it uses ThingHappened explicitly and I don't want that dependency
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});
Sorry if this was not clear enough, but DiDrDe.MessageBus will eventually be a nuGet package shared between many different consumer and publisher projects and it should not have any dependency to any specific message/model.
UPDATE 6: The question has been resolved. Thanks a lot to Igor and Chris for their time and help. I have pushed the solution to master on my repo.
PS: Unfortunately when I have two handlers in the same consumer handling the same Event, this solution has its limitations because it seems only one of the handlers is being executed (twice). I would expect both handlers to be executed or only one, but only once (not twice). But this is another subject already :)
Take a look at custom consumer convention here: https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional
https://github.com/MassTransit/MassTransit/blob/develop/tests/MassTransit.Tests/Conventional/ConventionConsumer_Specs.cs
Create your own IMyConsumerInterface, IMyMessageInterface plug it in the code from that test.
Register it ConsumerConvention.Register<CustomConsumerConvention>();
before creating the bus. It should work.
Additionally you can create your own wrapper around the consumer context and have that passed together with the message.
LoadFrom
(MassTransit.AutofacIntegration) didn't work for me with the custom convention so I had to manually register consumers
foreach (var consumer in consumerTypes)
cfg.Consumer(consumer, (Type x) => _container.Resolve(x));
Alternatively if you want to use "proxy" approach do something like:
public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
where TMessage : class, IMyMessageInterface
where TConsumer : IMyConsumerInterface<TMessage>
{
private readonly TConsumer _consumer;
public WrapperConsumer(TConsumer consumer)
{
_consumer = consumer;
}
public Task Consume(ConsumeContext<TMessage> context)
{
return _consumer.Consume(context.Message);
}
}
...
// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));
// marker interface
public interface IMyConsumerInterface
{
}
public interface IMyConsumerInterface<T> : IMyConsumerInterface
where T : IMyMessageInterface
{
Task Consume(T message);
}
...
builder.RegisterAssemblyTypes(ThisAssembly)
.AssignableTo<IMyConsumerInterface>()
.AsSelf()
.As<IMyConsumerInterface>();
...
var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
.ToList();
builder.Register(context =>
{
var ctx = context.Resolve<IComponentContext>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
foreach (var adapterType in adapterTypes)
consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
});
});
return busControl;
})