I am using MassTransit with Azure Service Bus, and my consumer is not receiving events when the message body uses a custom JSON format. The publisher's event format cannot be changed because it is an external application.
Custom JSON event THAT DOESNT WORK:
{
"planId": "1",
"description": "Startup Event"
}
MassTransit only processes the event when the message has the structure it expects: (The event is wrapped inside a message property. The event type is specified in an eventType property.) EVENT THAT DOES WORK:
"messageType": [
"urn:message.Enterprise.TM.Core.Events.ServiceBus:EventPlanCreated"
],
"message": {
"planId": "1",
"description": "Startup Event"
},
However, the custom JSON format event from the publisher does not follow this structure, so the consumer does not receive the events. I’ve attempted to implement a custom deserializer to handle the custom format, but it doesn’t seem to be applied.
How can I configure MassTransit to properly deserialize and consume messages with a custom JSON body?
Despite configuring the ClearSerialization and UseRawJsonSerializer, the consumer doesn't seem to properly deserialize the incoming JSON message.
I followed MassTransit documentation but it doesn't work with service bus https://masstransit.io/documentation/configuration/serialization#raw-json
Here is my consumer implementation:
public class ServiceBusEventConsumer : IConsumer<EventPlanCreated>
{
private readonly IServiceBusEventProcessorFactory _serviceBusEventProcessorFactory;
public ServiceBusEventConsumer(IServiceBusEventProcessorFactory serviceBusEventProcessorFactory)
{
_serviceBusEventProcessorFactory = serviceBusEventProcessorFactory;
}
public Task Consume(ConsumeContext<EventPlanCreated> context)
{
// var strategy = _serviceBusEventProcessorFactory.GetServiceBusEventHandlerStrategy(context.Message.ServiceBusEventType);
// await strategy.HandleEventAsync(context.Message);
return Task.CompletedTask;
}
}
public class EventPlanCreated
{
public string EventId{ get; set; }
public string Description{ get; set; }
}
Here is the configuration in Program.cs:
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<ServiceBusEventConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration["ServiceBusSettings:ConnectionString"]);
// Configure Subscription Endpoint
cfg.SubscriptionEndpoint<EventPlanCreated>("tmsuscription", e =>
{
e.ConfigureConsumeTopology = false;
e.ConfigureConsumer<ServiceBusEventConsumer>(context);
// Use custom deserializer
e.ClearSerialization();
e.UseRawJsonSerializer();
});
});
});
MassTransit by default enforces this envelope structure, it fails to deserialize your custom JSON format.
The issue is MassTransit expects your custom messages to follow a wrapped format (with messageType
and message
properties), while your external publisher is sending raw JSON objects
(planId
and description
directly in the body).
Using UseRawJsonSerializer(RawSerializerOptions.AddTransportHeaders | RawSerializerOptions.CopyHeaders)
to headers are passed while processing raw JSON
Updated MassTransit Configuration in Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<ServiceBusEventConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration["ServiceBusSettings:ConnectionString"]);
cfg.SubscriptionEndpoint("tmsubscript", e =>
{
e.ConfigureConsumeTopology = false;
e.ClearSerialization(); e.UseRawJsonSerializer(RawSerializerOptions.AddTransportHeaders | RawSerializerOptions.CopyHeaders);
e.DefaultContentType = new System.Net.Mime.ContentType("application/json"); e.ConfigureConsumer<ServiceBusEventConsumer>(context);
});
});
});
Updated Consumer Implementation:
public class ServiceBusEventConsumer : IConsumer<EventPlanCreated>
{
private readonly ILogger<ServiceBusEventConsumer> _logger;
public ServiceBusEventConsumer(ILogger<ServiceBusEventConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<EventPlanCreated> context)
{
try
{
_logger.LogInformation("Received message: EventId = {EventId}, Description = {Description}",
context.Message.EventId, context.Message.Description);
// Log the full raw message for debugging
string rawMessage = System.Text.Json.JsonSerializer.Serialize(context.Message, new JsonSerializerOptions { WriteIndented = true });
_logger.LogDebug("Raw Message Body:\n{RawMessage}", rawMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
await Task.CompletedTask;
}
}
Output: