I need to implement concurrent WebRequests on my ASP.NET 6 server hosted on a Linux OS. I've created a barrier implementation (class BarrierTask) to handle parallel processing. However, occasionally (as described below), the WebRequests take a long time and end up failing.
On StartUp I set ThreadPool:
System.Net.ServicePointManager.DefaultConnectionLimit = 500;
System.Threading.ThreadPool.SetMinThreads(1000, 500);
Example of running multiple parallel WebRequests:
BarrierTask<int, TestDto> barrierTask = new BarrierTask<int, TestDto>();
int[] arr = new int[count];
for (int i = 0; i < arr.Length; i++)
{
arr[i] = i;
}
TestDto[] results = barrierTask.Run((a) =>
{
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = delay,
Name = $"Test request {a}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
string json = DoRequest("https://.../apidebug/delay", str);
var ret = Newtonsoft.Json.JsonConvert.DeserializeObject<TestDto>(json);
return ret;
}
catch (Exception ex)
{
id.Error = ex.ToString().Replace("\r","\\r").Replace("\n","\\n");
id.TimestampEnd = DateTime.UtcNow;
return id;
}
}
, arr);
string DoRequest(string uri, string body)
{
using var client = new HttpClient();
using (var webRequest = new HttpRequestMessage(HttpMethod.Post, uri)
{
Content = new StringContent(body, Encoding.UTF8, "application/json"),
})
{
using (var response = client.Send(webRequest))
{
using (var reader = new StreamReader(response.Content.ReadAsStream()))
{
return reader.ReadToEnd();
}
}
}
}
BarrierTask:
public class BarrierTask<TArgs,TResult>
{
Barrier barrier;
public TResult[] Run(Func<TArgs, TResult> action, TArgs[] args)
{
List<TResult> ret = new List<TResult>();
barrier = new Barrier(args.Length + 1);
foreach (TArgs arg in args)
{
Task.Run(() =>
{
try
{
TResult result = action(arg);
lock (this)
{
ret.Add(result);
}
}
catch (Exception ex)
{
AppLog.Log("BarrierTask.Run: Input: " + arg.ToString() + " Ex: " + ex.Message, AppLog.MessageType.Error);
}
finally
{
barrier.SignalAndWait();
}
});
}
barrier.SignalAndWait();
return ret.ToArray();
}
}
Sample EndPoint:
[Route("api/delay")]
[ApiController]
public class DelayController : ControllerBase
{
[HttpPost]
public TestDto Post(TestDto testDto)
{
System.Threading.Thread.Sleep(testDto.Delay);
testDto.TimestampEnd = DateTime.UtcNow;
return testDto;
}
}
Case 1 - Start the server and perform 100 concurrent WebRequests with a 4000 ms delay:
Some of the WebRequests are processed successfully, but many requests fail with the exception: System.IO.IOException: The response ended prematurely.
The likely cause of this exception is that the Apache on the endpoint is in the "R - Reading Request" status, and after 20 seconds, it terminates the connection.
The second attempt is processed in approximately 4300 ms, indicating that everything is okay.
Case 2 - Start the server, warm up, and perform 100 concurrent WebRequests with a 4000 ms delay: Initially, I initiated a single request (using the same code). After the warm-up request is processed, I initiated 100 concurrent requests, and everything is functioning correctly.
Case 3 - After a few minutes: After a few minutes, a warm-up request is again required; otherwise, some of the requests result in an exception.
Please, any Ideas?
it seems like you're initiating too many concurrent web requests. If 80% of the 100 requests are failing, it might be more efficient to limit the number of concurrent requests.
You can use Parallel.ForEachAsync
to manage concurrent execution and limit the maximum degree of parallelism. Here's an example:
using HttpClient client = new();
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 10
};
await Parallel.ForEachAsync([Some Source], parallelOptions, async (item, token) =>
{
// Do something here
await client.SendAsync(/*Some Code…*/);
});
As you can see in the example above, the HttpClient
instance is not disposed. It's designed to be reused for multiple web requests. If you dispose of it after each request, it could exhaust all available ports, which won't be immediately freed by the disposal of the HttpClient
. Also, try to use the async versions of the methods whenever possible.
Regarding your BarrierTask
class, I'm not entirely sure what you're trying to achieve. I would suggest using Parallel.ForEachAsync
or a library. Here's an example of how you could use Parallel.ForEachAsync
:
public async Task StartRunAsync(List<int> ints)
{
HttpClient client = new HttpClient();
await Parallel.ForEachAsync(ints, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, async (aInt, cancellationToken) =>
{
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = delay,
Name = $"Test request {aInt}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
string json = DoRequest("https://.../apidebug/delay", str, client);
var ret = Newtonsoft.Json.JsonConvert.DeserializeObject<TestDto>(json);
return ret;
}
catch (Exception ex)
{
id.Error = ex.ToString().Replace("\r", "\\r").Replace("\n", "\\n");
id.TimestampEnd = DateTime.UtcNow;
return id;
}
});
}
If you want to use a library that provides more features, you can try use the Request Library from the NuGet Manager. Here's an example of how you could use it:
public void StartRun(List<int> ints)
{
HttpClient httpClient = new HttpClient();
foreach (var aInt in ints)
{
new OwnRequest(async (cancellationToken) =>
{
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = delay,
Name = $"Test request {aInt}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
string json = DoRequest("https://.../apidebug/delay", str, httpClient);
var ret = Newtonsoft.Json.JsonConvert.DeserializeObject<TestDto>(json);
return ret;
}
catch (Exception ex)
{
id.Error = ex.ToString().Replace("\r", "\\r").Replace("\n", "\\n");
id.TimestampEnd = DateTime.UtcNow;
return id;
}
});
}
}
This will retry if it fails, and you can specify the degree of concurrency on the fly, delay it, or something similar. You could also write your own class for this purpose.
public class SendRequest : Request<RequestOptions<string, Exception>, string, Exception>
{
private static Lazy<HttpClient> _client = new(() => new HttpClient());
private int _id;
private Uri _uri;
private int _delay;
public SendRequest(Uri uri, int id, int delay, RequestOptions<string, Exception> options = null) : base(options)
{
_id = id;
_delay = delay;
_uri = uri;
}
protected override async Task<RequestReturn> RunRequestAsync()
{
RequestReturn requestReturn = new RequestReturn();
var id = new TestDto()
{
TimestampStart = DateTime.UtcNow,
TimestampEnd = new DateTime(),
Delay = _delay,
Name = $"Test request {_id}",
Error = "OK"
};
string str = Newtonsoft.Json.JsonConvert.SerializeObject(id);
try
{
using HttpRequestMessage webRequest = new(HttpMethod.Post, _uri)
{
Content = new StringContent(str, Encoding.UTF8, "application/json"),
};
using HttpResponseMessage response = await _client.Value.SendAsync(webRequest);
using StreamReader reader = new(response.Content.ReadAsStream());
requestReturn.CompletedReturn = reader.ReadToEnd();
requestReturn.Successful = true;
}
catch (Exception ex)
{
requestReturn.Successful = false;
requestReturn.FailedReturn = ex;
}
return requestReturn;
}
}
// To make some posts, you could do:
public void StartRun(List<int> ints)
{
RequestHandler handler = new RequestHandler();
handler.StaticDegreeOfParallelism = 10;
foreach (int aInt in ints)
{
new SendRequest(new Uri("https://.../apidebug/delay"), aInt, delay, new RequestOptions<string, Exception>()
{
RequestCompleted = (request, yourString) => { /*Some code here*/ },
Handler = handler
});
}
}
With StaticDegreeOfParallelism
, you can change the parallelism. That goes also for the OwnRequest
.
I think this approach should get rid of the problems you mentioned. Note no code here is tested and is written based on the code you provided. It will not run out of the box.
I did also not really get what you want to do so sorry if this is no help.