Search code examples
c#.netasp.net-coreasp.net-web-apirabbitmq

RabbitMQ Client .Net Core Timing Out under load after rabbitmq upgrade. Consumer System.TimeoutException


We have a system that was running RabbitMQ 3.8.5 with RabbitMQ.Client 5.2.0. Part of the way the system uses rabbit is by creating a consumer to wait for a specific command from a different executable when a long-polling web request comes in from a client. The web request comes in on a .Net 6.0 web API consumer is created per client since said command is posted on a queue specific to that client. After a given timeout, the consumer is disposed and the client restarts the long-polling request. We were able to support 1500+ clients/consumers simulataneous long-polling requests. This worked fine until we upgraded to RabbitMQ 3.13.3 with RabbitMQ.Client 6.8.1. A timeout exception now occurs when about 700 clients initiate this long-polling request. The same happens when 500 clients are connected for a long amount of time (> 1 hour). We have also tried using EasyNetQ, since we use the library in other parts of the system, for consumption and got the same error and behavior, a timeout followed by the connection being terminated by the RabbitMQ Server.

Further investigation seems to indicate that the hearbeats are missed when the timeout exception occurs, and the RabbitMQ server terminates the connection, causing a domino effect of failures on our side. We have yet to figure out what's causing the timeout to occur and block all operations related to RabbitMQ.

EDIT Here are some details that might help there were no changes to any of the set up between the version upgrades.

  • Target workload 1500+ clients and remains as is
  • VM, Windows Server 2022 Standard x64, 32 GB RAM, 2.3 Ghz CPU
  • Queues are still classic and remains as is.

Some details are provided below:

Here is the initial exception that occurs when the error first occurs:

    at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
    at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean 
     durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
    at RabbitMQ.Client.Impl.ModelBase.ConsumerCount(String queue)

EasyNetQ error

System.TimeoutException: The operation has timed out.
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass51_0.<QueueDeclareAsync>b__0(IModel x)
   at EasyNetQ.Persistent.PersistentChannel.InvokeChannelActionAsync[TResult,TChannelAction](TChannelAction channelAction, CancellationToken cancellationToken)
   at EasyNetQ.RabbitAdvancedBus.QueueDeclareAsync(String name, Action`1 configure, CancellationToken cancellationToken)
   at EasyNetQ.AdvancedBusExtensions.QueueDeclare(IAdvancedBus bus, String name, Boolean durable, Boolean exclusive, Boolean autoDelete, CancellationToken cancellationToken)

Initial Consumer Code

 ncChannel.QueueDeclare(queueName, true, false, false, null);
 ncChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
 ncChannel.QueueBind(queueName, ExchangeName, routingKey);
Channel<BasicDeliverEventArgs> responseChannel = Channel.CreateBounded<BasicDeliverEventArgs>(channelOptions);
EventHandler<BasicDeliverEventArgs> nextCommandHandler = async (object sender, BasicDeliverEventArgs msg) =>
{
    try
    {
        if (msg != null && msg.Body.ToArray() != null)
        {
            string msgBody = Encoding.UTF8.GetString(msg.Body.ToArray());
            //log
        }
        else
        {
            //log
        }
        await responseChannel.Writer.WriteAsync(msg);
        responseChannel.Writer.TryComplete();
    }
    catch (Exception ex)
    {
        //log
    }
};
consumer.Received += nextCommandHandler;
 bool autoAck = true;
 consumerTag = ncChannel.BasicConsume(queueName, autoAck, consumer);

Tried changing the consumer to an async eventing basic consumer

Channel<string> responseChannel = Channel.CreateBounded<string>(channelOptions);

ncChannel.QueueDeclare(queueName, true, false, false, null);
ncChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
ncChannel.QueueBind(queueName, ExchangeName, routingKey);
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(ncChannel);
AsyncEventHandler<BasicDeliverEventArgs> nextCommandHandler = async (object sender, BasicDeliverEventArgs msg) =>
{
    try
    {
        if (msg != null && msg.Body.ToArray() != null)
        {
            string msgBody = Encoding.UTF8.GetString(msg.Body.ToArray());
            //log
        }
        else
        {
            //log
        }
        await responseChannel.Writer.WriteAsync(msg);
        responseChannel.Writer.TryComplete();
        await Task.Yield();
    }
    catch (Exception ex)
    {
        //log
    }
};
consumer.Received += nextCommandHandler;

bool autoAck = true;
consumerTag = ncChannel.BasicConsume(queueName, autoAck, consumer);

EasyNetQ Consumer Code

var queue = _rabbitBus.Advanced.QueueDeclare(queueName, true, false, false);
consumer = _rabbitBus.Advanced.Consume(queue, (body, properties, info) => Task.Factory.StartNew(async () =>
{
    try
    {
        string msgBody = null;
        if (body.ToArray() != null)
        {
            msgBody = Encoding.UTF8.GetString(body.ToArray());
            //log
        }
        else
        {
            //log
        }
        await responseChannel.Writer.WriteAsync(msgBody);
        responseChannel.Writer.TryComplete();
        await Task.Yield();
    }
    catch (Exception ex)
    {
        //log
    }
}));

Solution

  • After discussing with the rabbitmq maintainers, it seems that now there is a need to set the ThreadPool.MinThreads() of the application to avoid thread exhaustion for rabbitmq client operations. It's unknown even to them why this is a requirement on the 6.x version of the client library, it's suggested that the WIP version 7.x would address the issue.

    You can see the discussion here https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions/1635