in my Applications I use different Interfaces to handle Messages. To avoid multiple Implementations and to reduce the maintenance of the same DTO, I want to use a Custom Attribute to handle the Exchange name of the Masstransit part. Also I want to reduce the dependencies to Masstransit in these common software parts.
So my Question is " How to change the EntityNameAttribute to a custom NameAttribute in Masstransit. (RabbitMq Version 7.3.1) "
I Have tried to Overwrite the EntityNameFormatter for the MessageTopology.
cfg.MessageTopology.SetEntityNameFormatter(new CustomExchangeNameFormatter(cfg.MessageTopology.EntityNameFormatter));
If the Application publish a Message an error occurs.
_asyncApi.PublishMessage<CustomMessage>(new CustomMessage(){Content = "TestMessage"});
... RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=403, text='ACCESS_REFUSED - operation not permitted on the default exchange', classId=40, methodId=10 ...
public class CustomExchangeNameFormatter: IEntityNameFormatter
{
private IEntityNameFormatter _original;
public CustomExchangeNameFormatter(IEntityNameFormatter original)
{
_original = original;
}
//Used to rename the exchanges
public string FormatEntityName<T>()
{
string entityName;
var exchangeNameAttribute = typeof(T).GetCustomAttribute<ExchangeNameAttribute>();
if (exchangeNameAttribute != null)
{
entityName = exchangeNameAttribute.ExchangName;
}
else
{
entityName = _original.FormatEntityName<T>();
}
return entityName;
}
}
Hi fixed the Problem by moving the SetEntityNameFormatter
method call under the line cfg.SetMessageSerializer(..
and cfg.AddMessageDeserializer(..
. I don't know why, but if the method is called before these two lines it throws the described error.
public static IServiceCollection AddRabbitMqApi(this IServiceCollection services, IConfiguration configuration)
{
return services.AddMassTransit(busCfg =>
{
busCfg.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(...);
//Setup the NamespaceSpecificStrings
var (nssMappingForSerialization, nssMappingForDeserialization) = GetNssMappings(assemblies);
//serialization and deserialization
cfg.SetMessageSerializer(() => new CustomJsonMessageSerializer(nssMappingForSerialization));
cfg.AddMessageDeserializer(CustomJsonMessageSerializer.JsonContentType, () => new CustomJsonMessageDeserializer(CustomJsonMessageSerializer.Deserializer, nssMappingForDeserialization));
//entity name formatter
cfg.MessageTopology.SetEntityNameFormatter(new CustomExchangeNameFormatter(cfg.MessageTopology.EntityNameFormatter));
cfg.ConfigureJsonSerializer(settings =>
{
settings.DefaultValueHandling = DefaultValueHandling.Include;
return settings;
});
cfg.Publish<CustomMessage>(x => { x.ExchangeType = ExchangeType.Topic; });
});
})
.AddMassTransitHostedService();
}