I have a base class IntegrationEvent
and all other business events inherited from that.
public abstract class IntegrationEvent
{
public Guid Id { get; private set; }
public DateTimeOffset OccuredOn { get; private set; }
protected IntegrationEvent()
{
this.Id = Guid.NewGuid();
this.OccuredOn = DateTimeOffset.Now;
}
}
public sealed class StudentRegisteredIntegrationEvent : IntegrationEvent
{
public Guid StudentId { get; set; }
public string FullName { get; set; }
public StudentRegisteredIntegrationEvent(Guid studentId, string fullName)
{
StudentId = studentId;
FullName = fullName;
}
}
then I've created a consumer :
public sealed class StudentRegisteredConsumer: IConsumer<StudentRegisteredIntegrationEvent>
{
private readonly ILogger<StudentRegisteredConsumer> _logger;
public StudentRegisteredConsumer(ILogger<StudentRegisteredConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<StudentRegisteredIntegrationEvent> context)
{
_logger.LogWarning("========== Message Received +==========================");
_logger.LogInformation($"Sending notification to {context.Message.FullName}");
_logger.LogWarning("========== Message Received +==========================");
return Task.CompletedTask;
}
}
In the producer side I have a List<IntegrationEvent>
list and I publish
them via IPublishEndpoint
but it does not routed to the correct queue, instead it just creates another exchange, Sample.Abstraction.Domain:IntegrationEvent
. How can I tell MassTransit not to use the base class and instead use the real type class? I also tried with ISendEndpointProvider
but they are routed again to another queue, student-registered-integration-event_skipped
queue, since there is no consumer available for the base
class.
here is the log on the consumer side:
[00:33:21 DBG] Configuring endpoint student-registered-integration-event, Consumer: Sample.University.Notification.Consumers.StudentRegisteredConsumer
[00:33:21 DBG] Declare exchange: name: student-registered-integration-event, type: fanout, durable
[00:33:21 DBG] Declare exchange: name: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, type: fanout, durable
[00:33:21 DBG] Bind exchange: source: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, destination: student-registered-integration-event
[00:33:21 DBG] Declare queue: name: student-registered-integration-event, durable
[00:33:21 DBG] Bind queue: source: student-registered-integration-event, destination: student-registered-integration-event
[00:33:21 DBG] Prefetch Count: 16
[00:33:21 DBG] Consumer Ok: rabbitmq://localhost/wrapperizer/student-registered-integration-event - amq.ctag-nqSrJ0A5UQZCXg3tIr9Hfg
I have no clue how to configure, I also used ConsumerDefinition<StudentRegisteredConsumer>
but to no avail, here is the code:
public sealed class StudentRegisteredConsumerDefinition : ConsumerDefinition<StudentRegisteredConsumer>
{
public StudentRegisteredConsumerDefinition()
{
const string eventName = nameof(StudentRegisteredIntegrationEvent);
var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
this.EndpointName = sanitized;
}
}
on producer side to get the uri for send endpoint:
var eventName = logEvt.IntegrationEvent.GetType().Name;
var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
var uri = new Uri($"exchange:{sanitized}");
var sender = await _sendEndpointProvider.GetSendEndpoint(uri);
await sender.Send(logEvt.IntegrationEvent);
I know the above-code is kinda the default behavior of MT, but without that, I have no correct queues and exchanges. any solution will be appreciated.
First, you can do this entirely with Publish
, there is no need to configure all of the things you've done to try to work around the issue. You can configure your consumers by convention, and let them get configured on their own endpoints. The one part you missed is the way the messages are published.
From your List<IntegrationEvent>
, my suspicion is that you were calling Publish<T>(T message)
where T
was inferred to be IntegrationEvent
. This is why you're only getting messages on that exchange. You need to use the Publish(object message)
overload, so that the appropriate type is determined.
You can simply assign each message in the list to object, and then call Publish
with that object. Or, you can force the use of the overload:
await Task.WhenAll(events.Select(x => bus.Publish((object)x, x.GetType()));
That way, MassTransit will use the object type to call the appropriate generic overload.