Search code examples
c#azureazureservicebusmasstransit

Why is MassTransit Custom Deserialization Not Working with Azure Service Bus SubscriptionEndpoint?


I am using MassTransit with Azure Service Bus, and my consumer is not receiving events when the message body uses a custom JSON format. The publisher's event format cannot be changed because it is an external application.

Custom JSON event THAT DOESNT WORK:

{
    "planId": "1",
    "description": "Startup Event"
  }

MassTransit only processes the event when the message has the structure it expects: (The event is wrapped inside a message property. The event type is specified in an eventType property.) EVENT THAT DOES WORK:

  "messageType": [
    "urn:message.Enterprise.TM.Core.Events.ServiceBus:EventPlanCreated"
  ],
  "message": {
    "planId": "1",
    "description": "Startup Event"
  },

However, the custom JSON format event from the publisher does not follow this structure, so the consumer does not receive the events. I’ve attempted to implement a custom deserializer to handle the custom format, but it doesn’t seem to be applied.

How can I configure MassTransit to properly deserialize and consume messages with a custom JSON body?

Despite configuring the ClearSerialization and UseRawJsonSerializer, the consumer doesn't seem to properly deserialize the incoming JSON message.

I followed MassTransit documentation but it doesn't work with service bus https://masstransit.io/documentation/configuration/serialization#raw-json

Here is my consumer implementation:

   public class ServiceBusEventConsumer : IConsumer<EventPlanCreated>
{
    private readonly IServiceBusEventProcessorFactory _serviceBusEventProcessorFactory;

    public ServiceBusEventConsumer(IServiceBusEventProcessorFactory serviceBusEventProcessorFactory)
    {
        _serviceBusEventProcessorFactory = serviceBusEventProcessorFactory;
    }

    public Task Consume(ConsumeContext<EventPlanCreated> context)
    {
        // var strategy = _serviceBusEventProcessorFactory.GetServiceBusEventHandlerStrategy(context.Message.ServiceBusEventType);
        // await strategy.HandleEventAsync(context.Message);
        return Task.CompletedTask;
    }
}

public class EventPlanCreated
{
    public string EventId{ get; set; }
    public string Description{ get; set; }
}

Here is the configuration in Program.cs:

builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<ServiceBusEventConsumer>();

    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host(builder.Configuration["ServiceBusSettings:ConnectionString"]);

        // Configure Subscription Endpoint
        cfg.SubscriptionEndpoint<EventPlanCreated>("tmsuscription", e =>
        {
            e.ConfigureConsumeTopology = false;
            e.ConfigureConsumer<ServiceBusEventConsumer>(context);

            // Use custom deserializer
            e.ClearSerialization();
            e.UseRawJsonSerializer();
        });
    });
});

 

Solution

  • MassTransit by default enforces this envelope structure, it fails to deserialize your custom JSON format.

    The issue is MassTransit expects your custom messages to follow a wrapped format (with messageType and message properties), while your external publisher is sending raw JSON objects (planId and description directly in the body).

    Using UseRawJsonSerializer(RawSerializerOptions.AddTransportHeaders | RawSerializerOptions.CopyHeaders) to headers are passed while processing raw JSON

    Updated MassTransit Configuration in Program.cs

    builder.Services.AddMassTransit(x =>
    {
        x.AddConsumer<ServiceBusEventConsumer>();
    
        x.UsingAzureServiceBus((context, cfg) =>
        {
            cfg.Host(builder.Configuration["ServiceBusSettings:ConnectionString"]);
            cfg.SubscriptionEndpoint("tmsubscript", e =>
            {
                e.ConfigureConsumeTopology = false;
                e.ClearSerialization(); e.UseRawJsonSerializer(RawSerializerOptions.AddTransportHeaders | RawSerializerOptions.CopyHeaders);
                e.DefaultContentType = new System.Net.Mime.ContentType("application/json");            e.ConfigureConsumer<ServiceBusEventConsumer>(context);
            });
        });
    });
    
    

    Updated Consumer Implementation:

    public class ServiceBusEventConsumer : IConsumer<EventPlanCreated>
    {
        private readonly ILogger<ServiceBusEventConsumer> _logger;
    
        public ServiceBusEventConsumer(ILogger<ServiceBusEventConsumer> logger)
        {
            _logger = logger;
        }
    
        public async Task Consume(ConsumeContext<EventPlanCreated> context)
        {
            try
            {
                _logger.LogInformation("Received message: EventId = {EventId}, Description = {Description}",
                                       context.Message.EventId, context.Message.Description);
    
                // Log the full raw message for debugging
                string rawMessage = System.Text.Json.JsonSerializer.Serialize(context.Message, new JsonSerializerOptions { WriteIndented = true });
                _logger.LogDebug("Raw Message Body:\n{RawMessage}", rawMessage);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing message");
            }
    
            await Task.CompletedTask;
        }
    }
    

    Output:

    Azure Service Bus