Search code examples
c#.netasync-awaitrabbitmqmasstransit

MassTransit publish messages so slow in synchronous context


I have a situation in which I should publish messages through RabbitMQ synchronously (legacy code) or they will be out of order because MassTransit publishes in different threads

    public void PostUserQuantitySync(int userId, decimal amount)
{
    foreach (var item in Enumerable.Range(0, 1000))
    {
        var _ = _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item });
    }
    return Ok();
}

So I used TaskUtil.Await and/or Wait() but the publish performance is so poor (33/s message per second) whereas pure rabbit client has a much better result (200/s message per second at least) and respect to message ordering:

    public void PostUserQuantitySync(int userId, decimal amount)
    {
        foreach (var item in Enumerable.Range(0, 1000))
        {
            TaskUtil.Await(() _publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }, c => c.SetAwaitAck(false)));
        }
    }

Is there any performance issue with MassTransit in a synchronous context or should I use any tweak in my code?


Solution

  • If you produce a batch of message, in order, you should not wait for each publish, it will be very slow. Consider adding the tasks to a list, and waiting for them all at once:

    public void PostUserQuantitySync(int userId, decimal amount)
    {
        List<Task> tasks = new();
        foreach (var item in Enumerable.Range(0, 1000))
        {
            tasks.Add(_publishEndpoint.Publish(new CreateUserTransactionRequest() { Amount = item }));
        }
    
        await Task.WhenAll(tasks);
    }
    

    If you're using RabbitMQ, you can adjust the batch publisher setting (used in the background by MassTransit to group messages together when sending, to reduce round-trip overhead) via the ConfigureBatchPublish method as shown:

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", h =>
        {
            h.ConfigureBatchSettings(b => b.Enabled = false);
        });
    
        // ...
    });