I have a normal .Net web API project and for learning event-driven architecture I started a small project. I am new to mass transit and I have written the code using their documentation. I am using bus.Send(command) for commands and bus.Publish(event) for events. When I send a command I receive all the expected values in the command handler but while publishing some of the properties are null. When I publish the message for some reason Id and EntityThatTookAction are null but other properties are populated. I don't understand the reason for this behavior. Am I missing something? I would really appreciate help regarding this
services.AddMassTransit(massTransitConfig =>
{
massTransitConfig.AddConsumer<UserEventConsumer>();
massTransitConfig.AddConsumer<UserCommandHandler>();
massTransitConfig.AddConsumer<TodoListEventConsumer>();
massTransitConfig.AddConsumer<TodoListCommandHandler>();
massTransitConfig.UsingRabbitMq((context, rabbitMqConfig) =>
{
rabbitMqConfig.Host(new Uri(busConfig["Url"]), h =>
{
h.Username(busConfig["Username"]);
h.Password(busConfig["Password"]);
});
rabbitMqConfig.ReceiveEndpoint(nameof(UserEventConsumer), cfg =>
{
cfg.PurgeOnStartup = true;
cfg.PrefetchCount = 10;
cfg.Bind<UserCreatedEvent>();
cfg.Bind<UserUpdatedEvent>();
cfg.Bind<UserDeletedEvent>();
cfg.ConfigureConsumer(context, typeof(UserEventConsumer));
});
rabbitMqConfig.ReceiveEndpoint(nameof(UserCommandHandler), cfg =>
{
cfg.PurgeOnStartup = true;
cfg.PrefetchCount = 10;
cfg.Bind<CreateUserCommand>();
cfg.Bind<UpdateUserCommand>();
cfg.Bind<DeleteUserCommand>();
cfg.ConfigureConsumer(context, typeof(UserCommandHandler));
EndpointConvention.Map<CreateUserCommand>(cfg.InputAddress);
EndpointConvention.Map<UpdateUserCommand>(cfg.InputAddress);
EndpointConvention.Map<DeleteUserCommand>(cfg.InputAddress);
});
rabbitMqConfig.ReceiveEndpoint(nameof(TodoListEventConsumer), cfg =>
{
cfg.PurgeOnStartup = true;
cfg.PrefetchCount = 10;
cfg.ConfigureConsumer(context, typeof(TodoListEventConsumer));
});
rabbitMqConfig.ReceiveEndpoint(nameof(TodoListCommandHandler), cfg =>
{
cfg.PurgeOnStartup = true;
cfg.PrefetchCount = 10;
cfg.ConfigureConsumer(context, typeof(TodoListCommandHandler));
EndpointConvention.Map<CreateTodoListCommand>(cfg.InputAddress);
EndpointConvention.Map<UpdateTodoListCommand>(cfg.InputAddress);
EndpointConvention.Map<DeleteTodoListCommand>(cfg.InputAddress);
});
});
});
public class UserCreatedEvent : IBaseEvent
{
public string Id { get; }
public string Message { get; }
public string EntityThatTookAction { get; }
public DateTimeOffset CreatedOn { get; }
public EntityType EntityType { get; }
public Guid CorrelationId { get; }
public UserCreatedEvent(string entityId, string entity, EntityType entityType, Guid correlationId, DateTimeOffset createdOn)
{
Id = entityId;
EntityThatTookAction = entity;
CreatedOn = createdOn;
EntityType = entityType;
CorrelationId = correlationId;
Message = "User list created";
}
}
public interface IBaseEvent : CorrelatedBy<Guid>
{
public string Id { get; }
public string Message { get;}
public string EntityThatTookAction { get;}
public DateTimeOffset CreatedOn { get;}
public EntityType EntityType { get; }
}
var message = new UserCreatedEvent(entityId, entity, EntityType.User, correlationId, DateTimeOffset.UtcNow);
await _bus.Publish(message);
You didn't post your command contracts that work, but I would look for any differences between the two of them to see what might be happening. It could be that you don't have a { get; private set; }
on your properties, but I can't remember if that's required or not. I think it needs to be settable in order for deserialization to work. If you're using .NET 5, you can specify { get; init; }
instead.
Also, I cleaned up your bus configuration, you really likely don't want to use PurgeOnStartup
, and EndpointConvention
is frowned upon.
services.AddMassTransit(massTransitConfig =>
{
massTransitConfig.AddConsumer<UserEventConsumer>();
massTransitConfig.AddConsumer<UserCommandHandler>();
massTransitConfig.AddConsumer<TodoListEventConsumer>();
massTransitConfig.AddConsumer<TodoListCommandHandler>();
massTransitConfig.UsingRabbitMq((context, rabbitMqConfig) =>
{
rabbitMqConfig.Host(new Uri(busConfig["Url"]), h =>
{
h.Username(busConfig["Username"]);
h.Password(busConfig["Password"]);
});
rabbitMqConfig.ReceiveEndpoint(nameof(UserEventConsumer), cfg =>
{
cfg.PrefetchCount = 10;
cfg.ConfigureConsumer<UserEventConsumer>(context);
});
rabbitMqConfig.ReceiveEndpoint(nameof(UserCommandHandler), cfg =>
{
cfg.PrefetchCount = 10;
cfg.ConfigureConsumer<UserCommandHandler>(context);
});
rabbitMqConfig.ReceiveEndpoint(nameof(TodoListEventConsumer), cfg =>
{
cfg.PrefetchCount = 10;
cfg.ConfigureConsumer<TodoListEventConsumer>(context);
});
rabbitMqConfig.ReceiveEndpoint(nameof(TodoListCommandHandler), cfg =>
{
cfg.PrefetchCount = 10;
cfg.ConfigureConsumer<TodoListCommandHandler>(context);
});
});
});