Search code examples
c#.netrabbitmqmasstransitevent-driven

Some message properties are null while publishing event in mass transit


I have a normal .Net web API project and for learning event-driven architecture I started a small project. I am new to mass transit and I have written the code using their documentation. I am using bus.Send(command) for commands and bus.Publish(event) for events. When I send a command I receive all the expected values in the command handler but while publishing some of the properties are null. When I publish the message for some reason Id and EntityThatTookAction are null but other properties are populated. I don't understand the reason for this behavior. Am I missing something? I would really appreciate help regarding this

Configuration

services.AddMassTransit(massTransitConfig =>
{
    massTransitConfig.AddConsumer<UserEventConsumer>();
    massTransitConfig.AddConsumer<UserCommandHandler>();
    massTransitConfig.AddConsumer<TodoListEventConsumer>();
    massTransitConfig.AddConsumer<TodoListCommandHandler>();

    massTransitConfig.UsingRabbitMq((context, rabbitMqConfig) =>
    {
        rabbitMqConfig.Host(new Uri(busConfig["Url"]), h =>
        {
            h.Username(busConfig["Username"]);
            h.Password(busConfig["Password"]);
        });
        
        rabbitMqConfig.ReceiveEndpoint(nameof(UserEventConsumer), cfg =>
        {
            cfg.PurgeOnStartup = true;
            cfg.PrefetchCount = 10;
            cfg.Bind<UserCreatedEvent>();
            cfg.Bind<UserUpdatedEvent>();
            cfg.Bind<UserDeletedEvent>();
            cfg.ConfigureConsumer(context, typeof(UserEventConsumer));
        });
        
        rabbitMqConfig.ReceiveEndpoint(nameof(UserCommandHandler), cfg =>
        {
            cfg.PurgeOnStartup = true;
            cfg.PrefetchCount = 10;
            cfg.Bind<CreateUserCommand>();
            cfg.Bind<UpdateUserCommand>();
            cfg.Bind<DeleteUserCommand>();
            cfg.ConfigureConsumer(context, typeof(UserCommandHandler));
            EndpointConvention.Map<CreateUserCommand>(cfg.InputAddress);
            EndpointConvention.Map<UpdateUserCommand>(cfg.InputAddress);
            EndpointConvention.Map<DeleteUserCommand>(cfg.InputAddress);
        });
        
        rabbitMqConfig.ReceiveEndpoint(nameof(TodoListEventConsumer), cfg =>
        {
            cfg.PurgeOnStartup = true;
            cfg.PrefetchCount = 10;

            cfg.ConfigureConsumer(context, typeof(TodoListEventConsumer));
        });
        
        rabbitMqConfig.ReceiveEndpoint(nameof(TodoListCommandHandler), cfg =>
        {
            cfg.PurgeOnStartup = true;
            cfg.PrefetchCount = 10;
            
            cfg.ConfigureConsumer(context, typeof(TodoListCommandHandler));
            EndpointConvention.Map<CreateTodoListCommand>(cfg.InputAddress);
            EndpointConvention.Map<UpdateTodoListCommand>(cfg.InputAddress);
            EndpointConvention.Map<DeleteTodoListCommand>(cfg.InputAddress);
        });
    });
});

Event

public class UserCreatedEvent : IBaseEvent
{
    public string Id { get; }
    public string Message { get; }
    public string EntityThatTookAction { get; }
    public DateTimeOffset CreatedOn { get; }
    public EntityType EntityType { get; }
    public Guid CorrelationId { get; }

    public UserCreatedEvent(string entityId, string entity, EntityType entityType, Guid correlationId, DateTimeOffset createdOn)
    {
        Id = entityId;
        EntityThatTookAction = entity;
        CreatedOn = createdOn;
        EntityType = entityType;
        CorrelationId = correlationId;
        Message = "User list created";
    }
}

Interface

public interface IBaseEvent : CorrelatedBy<Guid>
{
    public string Id { get; }
    public string Message { get;}
    public string EntityThatTookAction { get;}
    public DateTimeOffset CreatedOn { get;}
    public EntityType EntityType { get; }
}

Publish

var message = new UserCreatedEvent(entityId, entity, EntityType.User, correlationId, DateTimeOffset.UtcNow);
await _bus.Publish(message);

Solution

  • You didn't post your command contracts that work, but I would look for any differences between the two of them to see what might be happening. It could be that you don't have a { get; private set; } on your properties, but I can't remember if that's required or not. I think it needs to be settable in order for deserialization to work. If you're using .NET 5, you can specify { get; init; } instead.

    Also, I cleaned up your bus configuration, you really likely don't want to use PurgeOnStartup, and EndpointConvention is frowned upon.

    services.AddMassTransit(massTransitConfig =>
    {
        massTransitConfig.AddConsumer<UserEventConsumer>();
        massTransitConfig.AddConsumer<UserCommandHandler>();
        massTransitConfig.AddConsumer<TodoListEventConsumer>();
        massTransitConfig.AddConsumer<TodoListCommandHandler>();
    
        massTransitConfig.UsingRabbitMq((context, rabbitMqConfig) =>
        {
            rabbitMqConfig.Host(new Uri(busConfig["Url"]), h =>
            {
                h.Username(busConfig["Username"]);
                h.Password(busConfig["Password"]);
            });
    
            rabbitMqConfig.ReceiveEndpoint(nameof(UserEventConsumer), cfg =>
            {
                cfg.PrefetchCount = 10;
                cfg.ConfigureConsumer<UserEventConsumer>(context);
            });
    
            rabbitMqConfig.ReceiveEndpoint(nameof(UserCommandHandler), cfg =>
            {
                cfg.PrefetchCount = 10;
                cfg.ConfigureConsumer<UserCommandHandler>(context);
            });
    
            rabbitMqConfig.ReceiveEndpoint(nameof(TodoListEventConsumer), cfg =>
            {
                cfg.PrefetchCount = 10;
                cfg.ConfigureConsumer<TodoListEventConsumer>(context);
            });
    
            rabbitMqConfig.ReceiveEndpoint(nameof(TodoListCommandHandler), cfg =>
            {
                cfg.PrefetchCount = 10;
                cfg.ConfigureConsumer<TodoListCommandHandler>(context);
            });
        });
    });