Search code examples
.netmultithreadingerlangrabbitmqthreshold

RabbitMQ , .NET and multithreading


I've been playing with RabbitMQ and its .NET connector for a while. After a period under test, my new system using RabbitMQ as a broker in a network of web applications and API, hit production. All the apps got stuck in a matter of minutes. I guess I hit some OS-dependent threshold or messed up some TCP stack (after the block not even the TCP/IP connections of my clients hit the webservers anymore).

I didn't find yet good material about how to deal with intense traffic spread through tens of processes and thousands of threads.

I've a system that generates 10k+ threads every second, each one has to drop a message through a connection and then terminate. I've just a couple of 'catchers' for all these messages.

Some countermeasures have been already undertaken. Not to use a new connection for each new thread -> Declare a connection factory and use a static connection (functions on connections and channels have been said to be thread-safe) -> Solved

Here's the code of the Factory

    public static class Factory
    {
        private static IConnection sharedConnection_cl;
        public static IConnection SharedConnection_cl
        {
            get
            {
                if (sharedConnection_cl == null)
                {
                    sharedConnection_cl = GetRabbitMqConnection();
                }
               return sharedConnection_cl;
         }

    private static IConnection GetRabbitMqConnection()
            {
                ConnectionFactory connectionFactory = new             ConnectionFactory();
                connectionFactory.HostName = "x.y.z.w";
                connectionFactory.UserName = "notTheGuestUser";
                connectionFactory.Password = "abcdef";
                connectionFactory.VirtualHost = "/dedicatedHost";
                return connectionFactory.CreateConnection();
          }

No to use all the available Erlang processes. Erlang processes threshold went hit in less than 10 minutes (closed channels over the same connection do not trigger the death of the corresponding Erlang processes on the server).-> Added a threshold on the maximum channel count for any given connection and protecting the accesses using semaphores. Every once in a while the connection gets closed and recreated (the corresponding Erlang processes terminate when the connetion is closed) -> Solved

Here's the code that manages the channel threshold

public static class Factory
{
    private static IConnection sharedConnection_cl;
    private static int channelCount_int { get; set; }
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
    public static IConnection SharedConnection_cl
    {
        get
        {
            if (sharedConnection_cl == null)
            {
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            connectionAccessSemaphore_sem.WaitOne();
            if (channelCount_int > 10000)
            {

                sharedConnection_cl.Close();
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            else
            {
                channelCount_int++;
            }
            connectionAccessSemaphore_sem.Release();
            return sharedConnection_cl;
        }
    }

Now... beyond incrementing OS standard thresholds (that would just prolong the agony of a inevitable block from few minutes.. to a couple of hours)...

Is there any good practice to manage connections and channels, in order to avoid the hit of OS thresholds and the emergence of saturation trends?

Thanks for any support.


Solution

  • Ok, the solution was already there. Just moving up the semaphore did the trick. I didn't considered that at system reboot, when all the appPools wind up, I had a concurrency issue on the connection instance allocation. -> Solved

    public static class Factory{
    private static IConnection sharedConnection_cl;
    private static int channelCount_int { get; set; }
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
    public static IConnection SharedConnection_cl
    {
        get
        {
            connectionAccessSemaphore_sem.WaitOne();
    
            if (sharedConnection_cl == null)
            {
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
    
            if (channelCount_int > 10000)
            {
    
                sharedConnection_cl.Close();
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            else
            {
                channelCount_int++;
            }
            connectionAccessSemaphore_sem.Release();
            return sharedConnection_cl;
        }
    }
    

    Not sure why the lock of the AppPools locked all the TCP connections on the server.