I'm facing an issue where producer.Produce() works and successfully sends messages to Kafka, but publishEndpoint.Publish() doesn’t seem to produce any messages. I'm using MassTransit with Kafka, and I suspect the problem lies in my configuration or the way MassTransit handles Kafka topics.
Relevant Configuration: Here’s how I’ve configured Kafka in my Startup.cs:
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddProducer<OrderSubmittedIntegrationEvent>("order-submitted");
rider.AddConsumers(Assembly.GetAssembly(typeof(Startup)));
rider.UsingKafka((context, k) =>
{
k.Host(Configuration["Kafka:Host"]);
k.TopicEndpoint<OrderSubmittedIntegrationEvent>("order-submitted", "consumer-group-A", e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer<OrderSubmittedConsumer>(context);
});
});
});
});
And this is the code I’m using to publish the event:
public class OrderService : IOrderService
{
protected IConfiguration _configuration;
protected readonly IAsyncRepository<Order, long> _repository;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ITopicProducer<OrderSubmittedIntegrationEvent> _producer;
public OrderService(IAsyncRepository<Order, long> repsitory,
IConfiguration configuration,
IPublishEndpoint publishEndpoint,
ITopicProducer<OrderSubmittedIntegrationEvent> producer)
{
_repository = repsitory;
_configuration = configuration;
_publishEndpoint = publishEndpoint;
_producer = producer;
}
public async Task<Result> SubmitOrder(long id)
{
var integrationEvent = new OrderSubmittedIntegrationEvent(
order.Id,
order.OrderDate,
order.OrderItems.Select(n => new OrderItemSubmittedIntegrationEvent(n.ItemClassId, n.Units)).ToList()
);
//await _publishEndpoint.Publish(integrationEvent);
await _producer.Produce(integrationEvent);
return validationResult;
}
Any help or insights would be greatly appreciated!
What Works: When I use producer.Produce() with the Kafka client directly, the message is sent to the topic successfully. Kafka and Zookeeper are both up and running. The Kafka topic exists and is accessible. What Doesn’t Work: publishEndpoint.Publish() doesn’t send any messages, and no errors or exceptions are thrown. I’ve enabled logging, but there are no visible errors indicating what might be going wrong.
Do I need to explicitly start the MassTransit bus when using Kafka? Does IPublishEndpoint require specific consumer registrations for messages to be published successfully?
Am I missing any specific configuration in the Kafka rider that could cause Publish to not work as expected?
IPublishEndpoint
has no connection at all to Kafka, it's used by the bus (and the underlying transport) which is not tied to Kafka.
ITopicProducer
is the only way to produce messages to Kafka using MassTransit.