I have a worker that sends Batchrequests to the api which is implemented my custom load balancer in ocelot,in this customLoadBalance , I want to distribute 70% of every batchRequests to HostAndPort1 and 30% to another one. final goal is to send 70% of total requests to FirstApi and 30% sends to SecondApi.The problem is that it does not distribute the way i want, it just work when i have just one thread.
This is Worker codes to sends requests to ocelotApi :
public async ValueTask<long> DoReadAsync()
{
OnProcess = true;
var count = 0;
try
{
var sends = (await
_unitOfWork.SendRepository.Where(x =>
x.sndPost == false, x =>
new Send
{
sndID = x.sndID,
SndBody = x.SndBody,
SndTo = x.SndTo,
sndFrom = x.sndFrom,
SndFarsi = x.SndFarsi,
sndMsgClass = x.sndMsgClass,
sndUDH = x.sndUDH
}, c => c.OrderBy(o => o.SndPriority), BaseConfig.BatchRead)).ToList();
count = sends.Count;
if (count == 0)
{
await Task.Delay(_settings.RefreshMillisecond);
OnProcess = false;
return count;
}
var split = BaseConfig.Split(sends);
var restTasks = split
.Select(items => _restService.DoPostAsync(items.ToList()))
.ToList();
var updateTasks = new List<Task>();
while (restTasks.Any())
{
var task = await Task.WhenAny(restTasks);
//task.ThrowExceptionIfTaskIsFaulted();
var item = await task.ConfigureAwait(false);
if (item is { IsSuccessStatusCode: true })
{
var content = await item.Content.ReadAsStringAsync();
var items = JsonConvert.DeserializeObject<List<ResponseDto>>(content);
var itemsSends = items.Select(_mapper.Map<Send>).ToList();
if (itemsSends.Any())
{
var updateTask = _unitOfWork.SendRepository.BulkUpdateForwardOcelotAsync(itemsSends);
updateTasks.Add(updateTask);
}
}
restTasks.Remove(task);
}
await Task.WhenAll(updateTasks).ConfigureAwait(false);
Completed = true;
OnProcess = false;
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
OnProcess = false;
}
return count;
}
In above code forexample we have Batch : 50000 , I split them to 10 tasks with 5000 requests and send them to the ocelotApi.
and this is my codes in ocelotapi , I have written a middleware like this :
public class BatchMiddleware : OcelotMiddleware
{
private readonly RequestDelegate _next;
private bool isRahyabRG = true;
private int remainedBatch = 0;
public BatchMiddleware(
RequestDelegate next,
IConfiguration configuration,
IOcelotLoggerFactory loggerFactory) : base(loggerFactory.CreateLogger<BatchMiddleware>())
{
_next = next;
}
public async Task Invoke(HttpContext httpContext)
{
var request = httpContext.Request;
var batchRequests = await request.DeserializeArrayAsync<RequestDto>();
var batchRequestCount = batchRequests.Count;
var RGCount = (int)Math.Floor(70 * batchRequestCount / 100.0);
if (isRahyabRG)
{
var rgRequests = batchRequests.Take(RGCount).ToList();
var requestBody = JsonConvert.SerializeObject(rgRequests);
request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
isRahyabRG = false;
remainedBatch = batchRequestCount - RGCount;
httpContext.Session.SetString("remainedBatchKey", remainedBatch.ToString());
}
else
{
var remainedBatchKey = httpContext.Session.GetString("remainedBatchKey");
var pmRequests = new List<RequestDto>();
if (remainedBatchKey != null)
{
pmRequests = batchRequests.Take(int.Parse(remainedBatchKey)).ToList();
}
var requestBody = JsonConvert.SerializeObject(pmRequests);
request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
isRahyabRG = true;
}
await _next.Invoke(httpContext);
}
and this is myCustomLoadBalancer :
public class MyRoundRobin : ILoadBalancer
{
private readonly Func<Task<List<Service>>> _services;
private readonly object _lock = new();
private int _last;
public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
{
_services = services;
}
public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
{
var services = await _services();
lock (_lock)
{
if (_last >= services.Count)
_last = 0;
var next = services[_last++];
return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
}
}
public void Release(ServiceHostAndPort hostAndPort)
{
}
}
}
and this is ocelot.json :
{
"Routes": [
{
"DownstreamPathTemplate": "/api/Forward",
"DownstreamScheme": "http",
"DownstreamHostAndPorts": [
{
"Host": "localhost",
"Port": 51003
},
{
"Host": "localhost",
"Port": 32667
}
],
"UpstreamPathTemplate": "/",
"UpstreamHttpMethod": [
"POST"
],
"LoadBalancerOptions": {
"Type": "MyRoundRobin"
}
}
]
}
The code you share is a sample from the Ocelot documentation and it is a basic round-robin load balancer. As I understand from the code, the ILoadBalancer
instance is created for every request. So the fields that hold shared info (lock and _last) should be static. You can achieve a weighted round-robin with the help of random numbers, as the request numbers are increased the distribution will approximate the expected value (%70 - %30). You can do this :
public class MyRoundRobin: ILoadBalancer
{
private readonly Func<Task<List<Service>>> _services;
public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
{
_services = services;
}
public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
{
var services = await _services();
var whatToPick = Random.Shared.NextDouble() >= 0.7 ? 1 : 0;
// Beware that Random.Shared added .NET 6 onwards for thread safety
var next = services[whatToPick];
return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
}
public void Release(ServiceHostAndPort hostAndPort)
{
}
}
If you do not want to implement a solution that involves random numbers you can try the below solution:
public class MyRoundRobin : ILoadBalancer
{
private readonly Func<Task<List<Service>>> _services;
private static readonly object _lock = new();
private static ulong counter;
public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
{
_services = services;
}
public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
{
var services = await _services();
lock (_lock)
{
var mod = unchecked(counter++) % 100;
var whatToPick = mod < 70 ? 0 : 1;
var next = services[whatToPick];
return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
}
}
public void Release(ServiceHostAndPort hostAndPort)
{
}
}