Search code examples
c#.netocelot

how to manage multi threading and batchReading in Ocelot Load Balancer with custom load balancer


I have a worker that sends Batchrequests to the api which is implemented my custom load balancer in ocelot,in this customLoadBalance , I want to distribute 70% of every batchRequests to HostAndPort1 and 30% to another one. final goal is to send 70% of total requests to FirstApi and 30% sends to SecondApi.The problem is that it does not distribute the way i want, it just work when i have just one thread.

This is Worker codes to sends requests to ocelotApi :

   public async ValueTask<long> DoReadAsync()
    {
        OnProcess = true;
        var count = 0;


        try
        {
            var sends = (await 
            _unitOfWork.SendRepository.Where(x =>
               x.sndPost == false, x =>
              new Send
              {
                  sndID = x.sndID,
                  SndBody = x.SndBody,
                  SndTo = x.SndTo,
                  sndFrom = x.sndFrom,
                  SndFarsi = x.SndFarsi,
                  sndMsgClass = x.sndMsgClass,
                  sndUDH = x.sndUDH
              }, c => c.OrderBy(o => o.SndPriority), BaseConfig.BatchRead)).ToList();

            count = sends.Count;

            if (count == 0)
            {
                await Task.Delay(_settings.RefreshMillisecond);
                OnProcess = false;
                return count;
            }
            var split = BaseConfig.Split(sends);

            var restTasks = split
                .Select(items => _restService.DoPostAsync(items.ToList()))
                .ToList();

            var updateTasks = new List<Task>();

            while (restTasks.Any())
            {
                var task = await Task.WhenAny(restTasks);
                //task.ThrowExceptionIfTaskIsFaulted();
                var item = await task.ConfigureAwait(false);
                if (item is { IsSuccessStatusCode: true })
                {
                    var content = await item.Content.ReadAsStringAsync();
                    var items = JsonConvert.DeserializeObject<List<ResponseDto>>(content);
                    var itemsSends = items.Select(_mapper.Map<Send>).ToList();
                    if (itemsSends.Any())
                    {
                        var updateTask = _unitOfWork.SendRepository.BulkUpdateForwardOcelotAsync(itemsSends);
                        updateTasks.Add(updateTask);
                    }

                }

                restTasks.Remove(task);

            }

            await Task.WhenAll(updateTasks).ConfigureAwait(false);
            Completed = true;
            OnProcess = false;


        }
        catch (Exception ex)
        {
            _logger.LogError(ex.Message);
            OnProcess = false;
        }
        return count;

    }

In above code forexample we have Batch : 50000 , I split them to 10 tasks with 5000 requests and send them to the ocelotApi.

and this is my codes in ocelotapi , I have written a middleware like this :

public class BatchMiddleware : OcelotMiddleware
{
    private readonly RequestDelegate _next;
    private bool isRahyabRG = true;
    private int remainedBatch = 0;
    
    public BatchMiddleware(
        RequestDelegate next,
        IConfiguration configuration,
        IOcelotLoggerFactory loggerFactory) : base(loggerFactory.CreateLogger<BatchMiddleware>())

    {
        _next = next;
    }

    public async Task Invoke(HttpContext httpContext)
    {
        var request = httpContext.Request;
        var batchRequests = await request.DeserializeArrayAsync<RequestDto>();
        var batchRequestCount = batchRequests.Count;
        var RGCount = (int)Math.Floor(70 * batchRequestCount / 100.0);

        if (isRahyabRG)
        {
            var rgRequests = batchRequests.Take(RGCount).ToList();
            var requestBody = JsonConvert.SerializeObject(rgRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = false;
            remainedBatch = batchRequestCount - RGCount;
            httpContext.Session.SetString("remainedBatchKey", remainedBatch.ToString());
        }
        else
        {
            var remainedBatchKey = httpContext.Session.GetString("remainedBatchKey");
            var pmRequests = new List<RequestDto>();
            if (remainedBatchKey != null)
            {
                pmRequests = batchRequests.Take(int.Parse(remainedBatchKey)).ToList();
            }
            var requestBody = JsonConvert.SerializeObject(pmRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = true;
        }
        
        await _next.Invoke(httpContext);

    }

and this is myCustomLoadBalancer :

  public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private readonly object _lock = new();

    private int _last;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();

        lock (_lock)
        {
            if (_last >= services.Count)
                _last = 0;

            var next = services[_last++];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}

}

and this is ocelot.json :

 {
"Routes": [

    {
        "DownstreamPathTemplate": "/api/Forward",
        "DownstreamScheme": "http",
        "DownstreamHostAndPorts": [

            {
                
                "Host": "localhost",
                "Port": 51003
            },
           
            {
                
                "Host": "localhost",
                "Port": 32667

            }

        ],
        "UpstreamPathTemplate": "/",
        "UpstreamHttpMethod": [
            "POST"
        ],


        "LoadBalancerOptions": {
            "Type": "MyRoundRobin"

        }
       
    }
]

}


Solution

  • The code you share is a sample from the Ocelot documentation and it is a basic round-robin load balancer. As I understand from the code, the ILoadBalancer instance is created for every request. So the fields that hold shared info (lock and _last) should be static. You can achieve a weighted round-robin with the help of random numbers, as the request numbers are increased the distribution will approximate the expected value (%70 - %30). You can do this :

    public class MyRoundRobin: ILoadBalancer
    {
        private readonly Func<Task<List<Service>>> _services;
        
        public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
        {
            _services = services;
        }
    
        public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
        {
            var services = await _services();
            var whatToPick = Random.Shared.NextDouble() >= 0.7 ? 1 : 0; 
            // Beware that Random.Shared added .NET 6 onwards for thread safety
            var next = services[whatToPick];
            
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);        
        }
    
        public void Release(ServiceHostAndPort hostAndPort)
        {
        }
    }
    

    If you do not want to implement a solution that involves random numbers you can try the below solution:

    public class MyRoundRobin : ILoadBalancer
    {
        private readonly Func<Task<List<Service>>> _services;
        private static readonly object _lock = new();
        private static ulong counter;
        
        public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
        {
            _services = services;
    
        }
    
        public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
        {
            var services = await _services();
           
            lock (_lock)
            {
                var mod = unchecked(counter++) % 100;
                var whatToPick = mod < 70 ? 0 : 1;
                var next = services[whatToPick];
            
                return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
            }
        }
    
        public void Release(ServiceHostAndPort hostAndPort)
        {
        }
    
    
    }