I am working on a project where (.NET 6
) I use AWS SQS/SNS for the Queues and Topics. I have an API
application that publishes messages to the queues, and a console application (service) that listens to the queues and consumes the messages.
This is my configuration in the API
which doesn't consume anything, but only publishes messages:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host("some-region", h =>
{
h.AccessKey(config.AccessKey);
h.SecretKey(config.AccessSecret);
h.Config(new AmazonSQSConfig() { RegionEndpoint = someregion });
});
cfg.ConfigureEndpoints(context);
});
});
This is my configuration in the Console Application (service) with the consumers:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<CreateFeedbackCommandConsumer>();
x.AddCommands();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host("some-region", h =>
{
h.AccessKey(sqsConfiguration.AccessKey);
h.SecretKey(sqsConfiguration.AccessSecret);
h.Config(new AmazonSQSConfig() { RegionEndpoint = someregion });
});
cfg.ReceiveEndpoint("create-feedback-command", endpoint =>
{
endpoint.ConfigureConsumeTopology = false;
endpoint.ClearSerialization();
endpoint.UseRawJsonSerializer();
endpoint.ConfigureConsumer<CreateFeedbackCommandConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
Everything seems to go well, the messages in SQS/SNS seems to have the correct properties and values, but when my consumer consumes the message, all properties are empty or NULL. This is my first time using MassTransit in combination with SQS, and this problem did not occur when I used Azure Service Bus.
The consumer:
public class CreateFeedbackCommandConsumer : IConsumer<CreateFeedbackCommand>
{
private readonly ILogger<CreateFeedbackCommandConsumer> _logger;
public CreateFeedbackCommandConsumer (ILogger<CreateFeedbackCommandConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<CreateFeedbackCommand> context)
{
try
{
var command = context.Message;
_logger.LogInformation($"Incoming {nameof(CreateFeedbackCommand)}. CorrelationId: '{command.CorrelationId}'. DateTime (UTC): '{DateTime.UtcNow:dd-MMM-yyy hh:mm:ss}'");
if (command.Feedback == null)
{
throw new ArgumentNullException(nameof(command.Feedback));
}
_logger.LogInformation($"Finished {nameof(CreateFeedbackCommand)}. CorrelationId: '{command.CorrelationId}'. DateTime (UTC): '{DateTime.UtcNow:dd-MMM-yyy hh:mm:ss}'");
}
catch (Exception ex)
{
_logger.LogError(ex, $"An error has occurred while processing the {nameof(CreateFeedbackCommand)} command");
throw;
}
}
The command looks like:
public class CreateFeedbackCommand
{
public CreateFeedbackCommand(CreateFeedback feedback)
{
CorrelationId = Guid.NewGuid();
Feedback = feedback;
}
/// <summary>
/// Unique Correlation ID.
/// </summary>
public Guid CorrelationId { get; set; }
/// <summary>
/// Feedback.
/// </summary>
public CreateFeedback Feedback { get; set; }
}
To summarize: publishing the message/command causes no faults/errors, the consumer consumes the message, but the properties of the context.Message
are NULL
or empty. So the Feedback
property for example, is NULL. When I look at the entire message in SQS/SNS, it's all correct. So my guess is, is that the deserialization goes wrong in the consumer. How do I fix this?
Why are you doing this in the consumer?
endpoint.ClearSerialization();
endpoint.UseRawJsonSerializer();
You should use the default JSON serialization built into MassTransit with the message envelope unless you have a very specific reason to change it.
And "because" isn't a reason, seriously. Use it.
Also, since you're aren't doing the same in the producer, you're serializing JSON with a message envelope, but since you've cleared it, you can't deserialize it. Make sure you configure the producer and the consumer the same.
Also, if you're publishing messages from the API, and yet setting ConfigureConsumeTopology = false
(which disables all publish bindings for the consumer), how would you expect to get published messages into the queues?